This commit is contained in:
crusader 2017-11-26 19:15:51 +09:00
parent af7b7c14bd
commit 2286a5021e
44 changed files with 1288 additions and 1172 deletions

2
.gitignore vendored
View File

@ -66,5 +66,3 @@ glide.lock
.DS_Store .DS_Store
dist/ dist/
debug debug

View File

@ -26,7 +26,7 @@ type Client interface {
Connect() error Connect() error
Close() Close()
Notify(method string, args ...interface{}) (err error) Send(method string, args ...interface{}) (err error)
Call(result interface{}, method string, args ...interface{}) error Call(result interface{}, method string, args ...interface{}) error
CallTimeout(timeout time.Duration, result interface{}, method string, args ...interface{}) (err error) CallTimeout(timeout time.Duration, result interface{}, method string, args ...interface{}) (err error)
} }
@ -37,10 +37,10 @@ type client struct {
conn net.Conn conn net.Conn
pendingRequestsCount uint32 pendingRequestsCount uint32
pendingRequests map[uint64]*CallState pendingRequests map[uint64]*RequestState
pendingRequestsLock sync.Mutex pendingRequestsLock sync.Mutex
requestQueueChan chan *CallState requestQueueChan chan *RequestState
stopChan chan struct{} stopChan chan struct{}
stopWg sync.WaitGroup stopWg sync.WaitGroup
@ -58,8 +58,8 @@ func (c *client) Connect() error {
return err return err
} }
c.stopChan = make(chan struct{}) c.stopChan = make(chan struct{})
c.requestQueueChan = make(chan *CallState, c.ch.GetPendingRequests()) c.requestQueueChan = make(chan *RequestState, c.ch.GetPendingRequests())
c.pendingRequests = make(map[uint64]*CallState) c.pendingRequests = make(map[uint64]*RequestState)
go c.handleRPC() go c.handleRPC()
@ -75,8 +75,8 @@ func (c *client) Close() {
c.stopChan = nil c.stopChan = nil
} }
func (c *client) Notify(method string, args ...interface{}) (err error) { func (c *client) Send(method string, args ...interface{}) (err error) {
var cs *CallState var cs *RequestState
if cs, err = c.send(true, false, nil, method, args...); nil != err { if cs, err = c.send(true, false, nil, method, args...); nil != err {
return return
} }
@ -84,7 +84,7 @@ func (c *client) Notify(method string, args ...interface{}) (err error) {
select { select {
case <-cs.DoneChan: case <-cs.DoneChan:
err = cs.Error err = cs.Error
ReleaseCallState(cs) releaseCallState(cs)
} }
return return
@ -95,7 +95,7 @@ func (c *client) Call(result interface{}, method string, args ...interface{}) er
} }
func (c *client) CallTimeout(timeout time.Duration, result interface{}, method string, args ...interface{}) (err error) { func (c *client) CallTimeout(timeout time.Duration, result interface{}, method string, args ...interface{}) (err error) {
var cs *CallState var cs *RequestState
if cs, err = c.send(true, true, result, method, args...); nil != err { if cs, err = c.send(true, true, result, method, args...); nil != err {
return return
} }
@ -105,7 +105,7 @@ func (c *client) CallTimeout(timeout time.Duration, result interface{}, method s
select { select {
case <-cs.DoneChan: case <-cs.DoneChan:
result, err = cs.Result, cs.Error result, err = cs.Result, cs.Error
ReleaseCallState(cs) releaseCallState(cs)
case <-t.C: case <-t.C:
cs.Cancel() cs.Cancel()
err = getClientTimeoutError(c, timeout) err = getClientTimeoutError(c, timeout)
@ -116,21 +116,21 @@ func (c *client) CallTimeout(timeout time.Duration, result interface{}, method s
return return
} }
func (c *client) send(usePool bool, hasResponse bool, result interface{}, method string, args ...interface{}) (cs *CallState, err error) { func (c *client) send(usePool bool, hasResponse bool, result interface{}, method string, args ...interface{}) (cs *RequestState, err error) {
if !hasResponse { if !hasResponse {
usePool = true usePool = true
} }
if usePool { if usePool {
cs = RetainCallState() cs = retainRequestState()
} else { } else {
cs = &CallState{} cs = &RequestState{}
} }
cs.hasResponse = hasResponse cs.hasResponse = hasResponse
cs.Method = method cs.Method = method
cs.Args = args cs.Args = args
cs.DoneChan = make(chan *CallState, 1) cs.DoneChan = make(chan *RequestState, 1)
if hasResponse { if hasResponse {
cs.ID = c.ch.GetRequestID() cs.ID = c.ch.GetRequestID()
@ -149,7 +149,7 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method
// Immediately notify the caller not interested // Immediately notify the caller not interested
// in the response on requests' queue overflow, since // in the response on requests' queue overflow, since
// there are no other ways to notify it later. // there are no other ways to notify it later.
ReleaseCallState(cs) releaseCallState(cs)
return nil, getClientOverflowError(c) return nil, getClientOverflowError(c)
} }
@ -160,7 +160,7 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method
//close(rcs.DoneChan) //close(rcs.DoneChan)
rcs.Done() rcs.Done()
} else { } else {
ReleaseCallState(rcs) releaseCallState(rcs)
} }
default: default:
} }
@ -171,7 +171,7 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method
default: default:
// Release m even if usePool = true, since m wasn't exposed // Release m even if usePool = true, since m wasn't exposed
// to the caller yet. // to the caller yet.
ReleaseCallState(cs) releaseCallState(cs)
return nil, getClientOverflowError(c) return nil, getClientOverflowError(c)
} }
} }
@ -229,7 +229,7 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) {
}() }()
for { for {
var cs *CallState var cs *RequestState
select { select {
case cs = <-c.requestQueueChan: case cs = <-c.requestQueueChan:
@ -250,7 +250,7 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) {
// close(m.done) // close(m.done)
cs.Done() cs.Done()
} else { } else {
ReleaseCallState(cs) releaseCallState(cs)
} }
continue continue
} }
@ -268,11 +268,12 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) {
continue continue
} }
} }
var requestID interface{} var requestID interface{}
if 0 < cs.ID { if 0 < cs.ID {
requestID = cs.ID requestID = cs.ID
} }
err = c.ch.GetCodec().Write(c.conn, cs.Method, cs.Args, requestID) err = c.ch.GetCodec().WriteRequest(c.conn, cs.Method, cs.Args, requestID)
if !cs.hasResponse { if !cs.hasResponse {
cs.Error = err cs.Error = err
cs.Done() cs.Done()
@ -302,7 +303,7 @@ func (c *client) rpcReader(readerDone chan<- error) {
}() }()
for { for {
crn, err := c.ch.GetCodec().NewResponseOrNotify(c.conn) msg, err := c.ch.GetCodec().NewMessage(c.conn)
if nil != err { if nil != err {
if err == io.ErrUnexpectedEOF || err == io.EOF { if err == io.ErrUnexpectedEOF || err == io.EOF {
logging.Logger().Info("Client: disconnected from server") logging.Logger().Info("Client: disconnected from server")
@ -313,11 +314,14 @@ func (c *client) rpcReader(readerDone chan<- error) {
continue continue
} }
if crn.IsResponse() { switch msg.MessageType() {
err = c.responseHandle(crn.GetResponse()) case protocol.MessageTypeResponse:
} else { c.handleResponse(msg)
err = c.notifyHandle(crn.GetNotify()) case protocol.MessageTypeNotification:
c.handleNotification(msg)
default:
} }
if nil != err { if nil != err {
logging.Logger().Error(err.Error()) logging.Logger().Error(err.Error())
continue continue
@ -326,9 +330,16 @@ func (c *client) rpcReader(readerDone chan<- error) {
} }
func (c *client) responseHandle(codecResponse protocol.ClientCodecResponse) error { func (c *client) handleResponse(msg protocol.ClientMessageCodec) error {
codec, err := msg.MessageCodec()
if nil != err {
return err
}
resCodec := codec.(protocol.ClientResponseCodec)
c.pendingRequestsLock.Lock() c.pendingRequestsLock.Lock()
id := reflect.ValueOf(codecResponse.ID()).Convert(uint64Type).Uint() id := reflect.ValueOf(resCodec.ID()).Convert(uint64Type).Uint()
cs, ok := c.pendingRequests[id] cs, ok := c.pendingRequests[id]
if ok { if ok {
@ -337,15 +348,15 @@ func (c *client) responseHandle(codecResponse protocol.ClientCodecResponse) erro
c.pendingRequestsLock.Unlock() c.pendingRequestsLock.Unlock()
if !ok { if !ok {
return fmt.Errorf("Client: Unexpected ID=[%v] obtained from server", codecResponse.ID()) return fmt.Errorf("Client: Unexpected ID=[%v] obtained from server", resCodec.ID())
} }
atomic.AddUint32(&c.pendingRequestsCount, ^uint32(0)) atomic.AddUint32(&c.pendingRequestsCount, ^uint32(0))
if err := codecResponse.Result(cs.Result); nil != err { if err := resCodec.Result(cs.Result); nil != err {
log.Printf("responseHandle:%v", err) log.Printf("responseHandle:%v", err)
} }
if err := codecResponse.Error(); nil != err { if err := resCodec.Error(); nil != err {
log.Printf("responseHandle:%v", err) log.Printf("responseHandle:%v", err)
// cs.Error = &ClientError{ // cs.Error = &ClientError{
// Server: true, // Server: true,
@ -358,8 +369,15 @@ func (c *client) responseHandle(codecResponse protocol.ClientCodecResponse) erro
return nil return nil
} }
func (c *client) notifyHandle(codecNotify protocol.ClientCodecNotify) error { func (c *client) handleNotification(msg protocol.ClientMessageCodec) error {
_, err := c.ch.GetRPCRegistry().Invoke(codecNotify) codec, err := msg.MessageCodec()
if nil != err {
return err
}
notiCodec := codec.(protocol.ClientNotificationCodec)
_, err = c.ch.GetRPCRegistry().Invoke(notiCodec)
return err return err
} }

View File

@ -6,24 +6,23 @@ import (
"time" "time"
) )
var callStatePool sync.Pool
var zeroTime time.Time var zeroTime time.Time
type CallState struct { type RequestState struct {
ID uint64 ID uint64
Method string Method string
Args interface{} Args interface{}
Result interface{} Result interface{}
Error error Error error
DoneChan chan *CallState DoneChan chan *RequestState
hasResponse bool hasResponse bool
canceled uint32 canceled uint32
} }
func (cs *CallState) Done() { func (rs *RequestState) Done() {
select { select {
case cs.DoneChan <- cs: case rs.DoneChan <- rs:
// ok // ok
default: default:
// We don't want to block here. It is the caller's responsibility to make // We don't want to block here. It is the caller's responsibility to make
@ -42,28 +41,30 @@ func (cs *CallState) Done() {
// //
// It is safe calling this function multiple times from concurrently // It is safe calling this function multiple times from concurrently
// running goroutines. // running goroutines.
func (cs *CallState) Cancel() { func (rs *RequestState) Cancel() {
atomic.StoreUint32(&cs.canceled, 1) atomic.StoreUint32(&rs.canceled, 1)
} }
func (cs *CallState) IsCanceled() bool { func (rs *RequestState) IsCanceled() bool {
return atomic.LoadUint32(&cs.canceled) != 0 return atomic.LoadUint32(&rs.canceled) != 0
} }
func RetainCallState() *CallState { var requestStatePool sync.Pool
v := callStatePool.Get()
func retainRequestState() *RequestState {
v := requestStatePool.Get()
if v == nil { if v == nil {
return &CallState{} return &RequestState{}
} }
return v.(*CallState) return v.(*RequestState)
} }
func ReleaseCallState(cs *CallState) { func releaseCallState(rs *RequestState) {
cs.Method = "" rs.Method = ""
cs.Args = nil rs.Args = nil
cs.Result = nil rs.Result = nil
cs.Error = nil rs.Error = nil
cs.DoneChan = nil rs.DoneChan = nil
callStatePool.Put(cs) requestStatePool.Put(rs)
} }

23
codec/codec.go Normal file
View File

@ -0,0 +1,23 @@
package codec
import "io"
// Codec interface contains the encoder for response or decoder for request.
// Eg. gzip, flate compressions.
type Codec interface {
Encode(w io.Writer) io.Writer
Decode(r io.Reader) io.Reader
}
type codec struct {
}
func (_ *codec) Encode(w io.Writer) io.Writer {
return w
}
func (_ *codec) Decode(r io.Reader) io.Reader {
return r
}
var DefaultCodec = &codec{}

25
codec/selector.go Normal file
View File

@ -0,0 +1,25 @@
package codec
import "io"
// CodecSelector interface provides a way to select encoder using the http
// request. Typically people can use this to check HEADER of the request and
// figure out client capabilities.
// Eg. "Accept-Encoding" tells about supported compressions.
type CodecSelector interface {
SelectByReader(r io.Reader) Codec
SelectByWriter(w io.Writer) Codec
}
type codecSelector struct {
}
func (_ *codecSelector) SelectByReader(_ io.Reader) Codec {
return DefaultCodec
}
func (_ *codecSelector) SelectByWriter(_ io.Writer) Codec {
return DefaultCodec
}
var DefaultCodecSelector = &codecSelector{}

5
constants.go Normal file
View File

@ -0,0 +1,5 @@
package rpc
const (
DefaultPendingMessages = 32 * 1024
)

View File

@ -1,23 +0,0 @@
package encode
import "io"
// Encoder interface contains the encoder for response.
// Eg. gzip, flate compressions.
type Encoder interface {
Encode(w io.Writer) io.Writer
Decode(r io.Reader) io.Reader
}
type encoder struct {
}
func (_ *encoder) Encode(w io.Writer) io.Writer {
return w
}
func (_ *encoder) Decode(r io.Reader) io.Reader {
return r
}
var DefaultEncoder = &encoder{}

View File

@ -1,25 +0,0 @@
package encode
import "io"
// EncoderSelector interface provides a way to select encoder using the http
// request. Typically people can use this to check HEADER of the request and
// figure out client capabilities.
// Eg. "Accept-Encoding" tells about supported compressions.
type EncoderSelector interface {
SelectByReader(r io.Reader) Encoder
SelectByWriter(w io.Writer) Encoder
}
type encoderSelector struct {
}
func (_ *encoderSelector) SelectByReader(_ io.Reader) Encoder {
return DefaultEncoder
}
func (_ *encoderSelector) SelectByWriter(_ io.Writer) Encoder {
return DefaultEncoder
}
var DefaultEncoderSelector = &encoderSelector{}

View File

@ -1,19 +0,0 @@
package gateway
import (
"context"
"io"
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol"
)
func Handle(ctx context.Context, sh ServerHandler, codec protocol.ServerCodec, r io.Reader, w io.Writer) error {
return rpc.Handle(sh, codec, r, w, func(codecReq protocol.ServerCodecRequest) (result interface{}, err error) {
var params []string
if params, err = codecReq.Params(); nil != err {
return nil, err
}
return sh.Invoke(ctx, codecReq.Method(), params)
})
}

View File

@ -1,13 +0,0 @@
package gateway
import (
"context"
"git.loafle.net/commons_go/rpc"
)
type ServerHandler interface {
rpc.ServerHandler
Invoke(ctx context.Context, method string, params []string) (result interface{}, err error)
}

View File

@ -1,21 +0,0 @@
package gateway
import (
"context"
"errors"
"git.loafle.net/commons_go/rpc"
)
type ServerHandlers struct {
rpc.ServerHandlers
}
func (sh *ServerHandlers) Invoke(ctx context.Context, method string, params []string) (result interface{}, err error) {
return nil, errors.New("Server: Handler method[Invoke] of Server is not implement")
}
func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate()
}

View File

@ -0,0 +1,8 @@
package gateway
import "git.loafle.net/commons_go/rpc"
type ServletHandler interface {
rpc.ServletHandler
}

View File

@ -0,0 +1,9 @@
package gateway
import (
"git.loafle.net/commons_go/rpc"
)
type ServletHandlers struct {
rpc.ServletHandlers
}

View File

@ -1,39 +0,0 @@
package rpc
import (
"io"
"git.loafle.net/commons_go/rpc/protocol"
)
type Invoker func(codecReq protocol.ServerCodecRequest) (result interface{}, err error)
func Handle(sh ServerHandler, codec protocol.ServerCodec, r io.Reader, w io.Writer, invoker Invoker) error {
sh.OnPreRead(r)
// Create a new codec request.
codecReq, errNew := codec.NewRequest(r)
defer func() {
if nil != codecReq {
codecReq.Complete()
}
}()
if nil != errNew {
return errNew
}
sh.OnPostRead(r)
result, err := invoker(codecReq)
if nil != err {
sh.OnPreWriteError(w, err)
codecReq.WriteError(w, 400, err)
sh.OnPostWriteError(w, err)
} else {
sh.OnPreWriteResult(w, result)
codecReq.WriteResponse(w, result)
sh.OnPostWriteResult(w, result)
}
return nil
}

View File

@ -1,7 +0,0 @@
package notify
const (
// DefaultPendingNotifies is the default number of pending messages
// handled by Client and Server.
DefaultPendingNotifies = 1024
)

View File

@ -1,163 +0,0 @@
package notify
import (
"fmt"
"log"
"net"
"runtime"
"sync"
"git.loafle.net/commons_go/rpc/client"
)
func New(nh NotifyHandler) Notifier {
n := &notify{
nh: nh,
}
return n
}
type Notifier interface {
Start(conn net.Conn)
Close()
Notify(method string, args ...interface{}) (err error)
}
type notify struct {
nh NotifyHandler
conn net.Conn
notifyQueueChan chan *client.CallState
stopChan chan struct{}
stopWg sync.WaitGroup
}
func (n *notify) Start(conn net.Conn) {
n.nh.Validate()
if n.stopChan != nil {
panic("RPC Notify: the given notify is already started. Call Notifier.Stop() before calling Notifier.Start(conn) again!")
}
n.conn = conn
n.stopChan = make(chan struct{})
n.notifyQueueChan = make(chan *client.CallState, n.nh.GetPendingNotifies())
go n.handleRPC()
}
func (n *notify) Close() {
if n.stopChan == nil {
panic("RPC Notify: the notify must be started before stopping it")
}
close(n.stopChan)
n.stopWg.Wait()
n.stopChan = nil
}
func (n *notify) Notify(method string, args ...interface{}) (err error) {
var cs *client.CallState
if cs, err = n.send(method, args...); nil != err {
return
}
select {
case <-cs.DoneChan:
err = cs.Error
client.ReleaseCallState(cs)
}
return
}
func (n *notify) send(method string, args ...interface{}) (cs *client.CallState, err error) {
cs = client.RetainCallState()
cs.Method = method
cs.Args = args
cs.DoneChan = make(chan *client.CallState, 1)
select {
case n.notifyQueueChan <- cs:
return cs, nil
default:
client.ReleaseCallState(cs)
return nil, getClientOverflowError(n)
}
}
func (n *notify) handleRPC() {
subStopChan := make(chan struct{})
writerDone := make(chan error, 1)
go n.rpcWriter(subStopChan, writerDone)
var err error
select {
case err = <-writerDone:
close(subStopChan)
case <-n.stopChan:
close(subStopChan)
<-writerDone
}
if err != nil {
//n.LogError("%s", err)
log.Printf("handleRPC: %v", err)
err = &client.ClientError{
Connection: true,
Err: err,
}
}
}
func (n *notify) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) {
var err error
defer func() {
writerDone <- err
}()
for {
var cs *client.CallState
select {
case cs = <-n.notifyQueueChan:
default:
// Give the last chance for ready goroutines filling n.requestsChan :)
runtime.Gosched()
select {
case <-stopChan:
return
case cs = <-n.notifyQueueChan:
}
}
if cs.IsCanceled() {
client.ReleaseCallState(cs)
continue
}
err = n.nh.GetCodec().Write(n.conn, cs.Method, cs.Args, nil)
cs.Error = err
cs.Done()
if nil != err {
err = fmt.Errorf("RPC Notify: Cannot send notify to wire: [%s]", err)
return
}
}
}
func getClientOverflowError(n *notify) error {
err := fmt.Errorf("RPC Notify: Notifies' queue with size=%d is overflown. Try increasing NotifyHandler.PendingNotifies value", cap(n.notifyQueueChan))
//c.LogError("%s", err)
return &client.ClientError{
Overflow: true,
Err: err,
}
}

View File

@ -1,12 +0,0 @@
package notify
import (
"git.loafle.net/commons_go/rpc/protocol"
)
type NotifyHandler interface {
GetCodec() protocol.ClientCodec
GetPendingNotifies() int
Validate()
}

View File

@ -1,32 +0,0 @@
package notify
import (
"git.loafle.net/commons_go/rpc/protocol"
)
type NotifyHandlers struct {
Codec protocol.ClientCodec
// The maximum number of pending requests in the queue.
//
// The number of pending requsts should exceed the expected number
// of concurrent goroutines calling client's methods.
// Otherwise a lot of ClientError.Overflow errors may appear.
//
// Default is DefaultPendingNotifies.
PendingNotifies int
}
func (nh *NotifyHandlers) GetCodec() protocol.ClientCodec {
return nh.Codec
}
func (nh *NotifyHandlers) GetPendingNotifies() int {
return nh.PendingNotifies
}
func (nh *NotifyHandlers) Validate() {
if nh.PendingNotifies <= 0 {
nh.PendingNotifies = DefaultPendingNotifies
}
}

View File

@ -4,29 +4,42 @@ import (
"io" "io"
) )
type MessageType int
const (
MessageTypeUnknown MessageType = iota
MessageTypeRequest
MessageTypeResponse
MessageTypeNotification
)
// ClientCodec creates a ClientCodecRequest to process each request. // ClientCodec creates a ClientCodecRequest to process each request.
type ClientCodec interface { type ClientCodec interface {
Write(w io.Writer, method string, args interface{}, id interface{}) error WriteRequest(w io.Writer, method string, args interface{}, id interface{}) error
NewResponseOrNotify(rc io.Reader) (ClientCodecResponseOrNotify, error) NewMessage(rc io.Reader) (ClientMessageCodec, error)
} }
// ClientCodecResponseOrNotify encodes a response or notify using a specific type ClientRequestCodec interface {
// serialization scheme. RegistryCodec
type ClientCodecResponseOrNotify interface { }
IsResponse() bool
IsNotify() bool // ClientMessageCodec decodes a response or notification using a specific
GetResponse() ClientCodecResponse // serialization scheme.
GetNotify() ClientCodecNotify type ClientMessageCodec interface {
Complete() MessageType() MessageType
} // Reads the message filling the RPC response or notification.
MessageCodec() (interface{}, error)
type ClientCodecResponse interface {
ID() interface{} Close()
Result(result interface{}) error }
Error() error
Complete() type ClientResponseCodec interface {
} Result(result interface{}) error
Error() error
type ClientCodecNotify interface { ID() interface{}
Close()
}
type ClientNotificationCodec interface {
RegistryCodec RegistryCodec
} }

View File

@ -6,48 +6,15 @@ import (
"io" "io"
"sync" "sync"
"git.loafle.net/commons_go/rpc/encode" "git.loafle.net/commons_go/rpc/codec"
"git.loafle.net/commons_go/rpc/protocol" "git.loafle.net/commons_go/rpc/protocol"
) )
// ---------------------------------------------------------------------------- // clientMessage represents a JSON-RPC message sent to a client.
// Request and Response type clientMessage struct {
// ---------------------------------------------------------------------------- Version string `json:"jsonrpc"`
MessageType protocol.MessageType `json:"messageType"`
// clientRequest represents a JSON-RPC request sent by a client. Message *json.RawMessage `json:"message"`
type clientRequest struct {
// JSON-RPC protocol.
Version string `json:"jsonrpc"`
// A String containing the name of the method to be invoked.
Method string `json:"method"`
// Object to pass as request parameter to the method.
Params interface{} `json:"params"`
// The request id. This can be of any type. It is used to match the
// response with the request that it is replying to.
ID interface{} `json:"id"`
}
// clientResponse represents a JSON-RPC response returned to a client.
type clientResponse struct {
Version string `json:"jsonrpc"`
Result *json.RawMessage `json:"result,omitempty"`
Error error `json:"error,omitempty"`
ID interface{} `json:"id,omitempty"`
}
// clientRequest represents a JSON-RPC request sent by a client.
type clientNotify struct {
// JSON-RPC protocol.
Version string `json:"jsonrpc"`
// A String containing the name of the method to be invoked.
Method string `json:"method"`
// Object to pass as request parameter to the method.
Params *json.RawMessage `json:"params,omitempty"`
} }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
@ -55,29 +22,27 @@ type clientNotify struct {
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// NewCustomClientCodec returns a new JSON Codec based on passed encoder selector. // NewCustomClientCodec returns a new JSON Codec based on passed encoder selector.
func NewCustomClientCodec(encSel encode.EncoderSelector) *ClientCodec { func NewCustomClientCodec(codecSel codec.CodecSelector) protocol.ClientCodec {
return &ClientCodec{encSel: encSel} return &ClientCodec{codecSel: codecSel}
} }
// NewClientCodec returns a new JSON Codec. // NewClientCodec returns a new JSON Codec.
func NewClientCodec() *ClientCodec { func NewClientCodec() protocol.ClientCodec {
return NewCustomClientCodec(encode.DefaultEncoderSelector) return NewCustomClientCodec(codec.DefaultCodecSelector)
} }
// ClientCodec creates a ClientCodecRequest to process each request. // ClientCodec creates a ClientCodecRequest to process each request.
type ClientCodec struct { type ClientCodec struct {
encSel encode.EncoderSelector codecSel codec.CodecSelector
} }
func (cc *ClientCodec) Write(w io.Writer, method string, args interface{}, id interface{}) error { func (cc *ClientCodec) WriteRequest(w io.Writer, method string, args interface{}, id interface{}) error {
req := retainClientRequest(method, args, id) req := retainClientRequest(method, args, id)
defer func() { defer func() {
if nil != req { releaseClientRequest(req)
releaseClientRequest(req)
}
}() }()
encoder := json.NewEncoder(cc.encSel.SelectByWriter(w).Encode(w)) encoder := json.NewEncoder(cc.codecSel.SelectByWriter(w).Encode(w))
if err := encoder.Encode(req); nil != err { if err := encoder.Encode(req); nil != err {
return err return err
} }
@ -85,126 +50,97 @@ func (cc *ClientCodec) Write(w io.Writer, method string, args interface{}, id in
return nil return nil
} }
// NewResponse returns a ClientCodecResponse. // NewMessage returns a ClientMessageCodec.
func (cc *ClientCodec) NewResponseOrNotify(r io.Reader) (protocol.ClientCodecResponseOrNotify, error) { func (cc *ClientCodec) NewMessage(r io.Reader) (protocol.ClientMessageCodec, error) {
return newClientCodecResponseOrNotify(r, cc.encSel.SelectByReader(r)) return newClientMessageCodec(r, cc.codecSel.SelectByReader(r))
} }
// newCodecRequest returns a new ServerCodecRequest. // newClientMessageCodec returns a new ClientMessageCodec.
func newClientCodecResponseOrNotify(r io.Reader, encoder encode.Encoder) (protocol.ClientCodecResponseOrNotify, error) { func newClientMessageCodec(r io.Reader, codec codec.Codec) (protocol.ClientMessageCodec, error) {
// Decode the request body and check if RPC method is valid. msg := retainClientMessage()
var raw json.RawMessage err := json.NewDecoder(r).Decode(msg)
dec := json.NewDecoder(r)
err := dec.Decode(&raw)
if err == io.ErrUnexpectedEOF || err == io.EOF {
return nil, err
}
if err != nil { if err != nil {
if err == io.ErrUnexpectedEOF || err == io.EOF {
return nil, err
}
err = &Error{ err = &Error{
Code: E_PARSE, Code: E_PARSE,
Message: err.Error(), Message: err.Error(),
Data: raw, Data: msg,
}
}
if msg.Version != Version {
err = &Error{
Code: E_INVALID_REQ,
Message: "jsonrpc must be " + Version,
Data: msg,
} }
} }
ccrn := retainClientCodecResponseOrNotify() return retainClientMessageCodec(msg, err, codec), nil
}
if notify, err := newClientCodecNotify(raw); nil != err { type ClientMessageCodec struct {
res, err := newClientCodecResponse(raw) msg *clientMessage
if nil != err { err error
releaseClientCodecResponseOrNotify(ccrn) codec codec.Codec
return nil, fmt.Errorf("Is not response or notification [%v]", raw) }
}
ccrn.response = res func (ccm *ClientMessageCodec) MessageType() MessageType {
} else { return ccm.msg.MessageType
ccrn.notify = notify }
func (ccm *ClientMessageCodec) MessageCodec() (interface{}, error) {
switch ccm.msg.MessageType {
case protocol.MessageTypeResponse:
return newClientResponseCodec(ccm.msg.Message, ccm.codec)
case protocol.MessageTypeNotification:
return newClientNotificationCodec(ccm.msg.Message, ccm.codec)
}
return nil, fmt.Errorf("RPC: Not supported message type[%v]", ccm.msg.MessageType)
}
func (ccm *ClientMessageCodec) Close() {
if nil != ccm.msg {
releaseClientMessage(ccm.msg)
} }
// if res, err := newClientCodecResponse(raw); nil != err { releaseClientMessageCodec(ccm)
// notify, err := newClientCodecNotify(raw)
// if nil != err {
// releaseClientCodecResponseOrNotify(ccrn)
// return nil, fmt.Errorf("Is not response or notification [%v]", raw)
// }
// ccrn.notify = notify
// } else {
// ccrn.response = res
// }
return ccrn, nil
} }
type ClientCodecResponseOrNotify struct { var clientMessagePool sync.Pool
notify protocol.ClientCodecNotify
response protocol.ClientCodecResponse
}
func (ccrn *ClientCodecResponseOrNotify) IsResponse() bool { func retainClientMessage() *clientMessage {
return nil != ccrn.response v := clientMessagePool.Get()
}
func (ccrn *ClientCodecResponseOrNotify) IsNotify() bool {
return nil != ccrn.notify
}
func (ccrn *ClientCodecResponseOrNotify) GetResponse() protocol.ClientCodecResponse {
return ccrn.response
}
func (ccrn *ClientCodecResponseOrNotify) GetNotify() protocol.ClientCodecNotify {
return ccrn.notify
}
func (ccrn *ClientCodecResponseOrNotify) Complete() {
if nil != ccrn.notify {
ccrn.notify.Complete()
}
if nil != ccrn.response {
ccrn.response.Complete()
}
releaseClientCodecResponseOrNotify(ccrn)
}
var clientRequestPool sync.Pool
func retainClientRequest(method string, params interface{}, id interface{}) *clientRequest {
var cr *clientRequest
v := clientRequestPool.Get()
if v == nil { if v == nil {
cr = &clientRequest{} return &clientMessage{}
} else {
cr = v.(*clientRequest)
} }
return v.(*clientMessage)
cr.Version = Version
cr.Method = method
cr.Params = params
cr.ID = id
return cr
} }
func releaseClientRequest(cr *clientRequest) { func releaseClientMessage(cm *clientMessage) {
cr.Version = "" clientMessagePool.Put(cm)
cr.Method = ""
cr.Params = nil
cr.ID = nil
clientRequestPool.Put(cr)
} }
var clientCodecResponseOrNotifyPool sync.Pool var clientMessageCodecPool sync.Pool
func retainClientCodecResponseOrNotify() *ClientCodecResponseOrNotify { func retainClientMessageCodec(msg *clientMessage, err error, codec codec.Codec) *ClientMessageCodec {
v := clientCodecResponseOrNotifyPool.Get() var ccm *ClientMessageCodec
v := clientMessageCodecPool.Get()
if v == nil { if v == nil {
return &ClientCodecResponseOrNotify{} ccm = &ClientMessageCodec{}
} else {
ccm = v.(*ClientMessageCodec)
} }
return v.(*ClientCodecResponseOrNotify)
ccm.msg = msg
ccm.err = err
ccm.codec = codec
return ccm
} }
func releaseClientCodecResponseOrNotify(cr *ClientCodecResponseOrNotify) { func releaseClientMessageCodec(cr *ClientMessageCodec) {
clientCodecResponseOrNotifyPool.Put(cr) clientMessageCodecPool.Put(cr)
} }

View File

@ -0,0 +1,97 @@
package json
import (
"encoding/json"
"sync"
"git.loafle.net/commons_go/rpc/codec"
"git.loafle.net/commons_go/rpc/protocol"
)
// ----------------------------------------------------------------------------
// ClientNotificationCodec
// ----------------------------------------------------------------------------
// clientRequest represents a JSON-RPC notification sent to a client.
type clientNotification struct {
// A String containing the name of the method to be invoked.
Method string `json:"method"`
// Object to pass as request parameter to the method.
Params *json.RawMessage `json:"params,omitempty"`
}
// newClientNotificationCodec returns a new ClientNotificationCodec.
func newClientNotificationCodec(raw *json.RawMessage, codec codec.Codec) (protocol.ClientNotificationCodec, error) {
// Decode the request body and check if RPC method is valid.
cnc := retainClientNotificationCodec()
if err := json.Unmarshal(*raw, &cnc.notification); nil != err {
releaseClientNotificationCodec(cnc)
return nil, err
}
return cnc, nil
}
// ClientNotificationCodec decodes and encodes a single notification.
type ClientNotificationCodec struct {
notification clientNotification
err error
}
func (crc *ClientNotificationCodec) Method() string {
return crc.notification.Method
}
func (crc *ClientNotificationCodec) ReadParams(args []interface{}) error {
if crc.err == nil && crc.notification.Params != nil {
// Note: if scr.request.Params is nil it's not an error, it's an optional member.
// JSON params structured object. Unmarshal to the args object.
if err := json.Unmarshal(*crc.notification.Params, &args); err != nil {
crc.err = &Error{
Code: E_INVALID_REQ,
Message: err.Error(),
Data: crc.notification.Params,
}
}
}
return crc.err
}
func (crc *ClientNotificationCodec) Params() ([]string, error) {
if crc.err == nil && crc.notification.Params != nil {
var results []string
if err := json.Unmarshal(*crc.notification.Params, &results); err != nil {
crc.err = &Error{
Code: E_INVALID_REQ,
Message: err.Error(),
Data: crc.notification.Params,
}
return nil, crc.err
}
return results, nil
}
return nil, crc.err
}
func (crc *ClientNotificationCodec) Close() {
releaseClientNotificationCodec(crc)
}
var clientNotificationCodecPool sync.Pool
func retainClientNotificationCodec() *ClientNotificationCodec {
v := clientNotificationCodecPool.Get()
if v == nil {
return &ClientNotificationCodec{}
}
return v.(*ClientNotificationCodec)
}
func releaseClientNotificationCodec(crc *ClientNotificationCodec) {
crc.notification.Method = ""
crc.notification.Params = nil
clientNotificationCodecPool.Put(crc)
}

View File

@ -1,103 +0,0 @@
package json
import (
"encoding/json"
"fmt"
"sync"
"git.loafle.net/commons_go/rpc/protocol"
)
// ----------------------------------------------------------------------------
// ClientCodecNotify
// ----------------------------------------------------------------------------
// newCodecRequest returns a new ClientCodecNotify.
func newClientCodecNotify(raw json.RawMessage) (protocol.ClientCodecNotify, error) {
// Decode the request body and check if RPC method is valid.
ccn := retainClientCodecNotify()
err := json.Unmarshal(raw, &ccn.notify)
if err != nil {
releaseClientCodecNotify(ccn)
return nil, err
}
if "" == ccn.notify.Method {
releaseClientCodecNotify(ccn)
return nil, fmt.Errorf("This is not Client Notify")
}
if ccn.notify.Version != Version {
ccn.err = &Error{
Code: E_INVALID_REQ,
Message: "jsonrpc must be " + Version,
Data: ccn.notify,
}
}
return ccn, nil
}
// ClientCodecNotify decodes and encodes a single notification.
type ClientCodecNotify struct {
notify clientNotify
err error
}
func (ccn *ClientCodecNotify) Method() string {
return ccn.notify.Method
}
func (ccn *ClientCodecNotify) ReadParams(args []interface{}) error {
if ccn.err == nil && ccn.notify.Params != nil {
// Note: if scr.request.Params is nil it's not an error, it's an optional member.
// JSON params structured object. Unmarshal to the args object.
if err := json.Unmarshal(*ccn.notify.Params, &args); err != nil {
ccn.err = &Error{
Code: E_INVALID_REQ,
Message: err.Error(),
Data: ccn.notify.Params,
}
}
}
return ccn.err
}
func (ccn *ClientCodecNotify) Params() ([]string, error) {
if ccn.err == nil && ccn.notify.Params != nil {
var results []string
if err := json.Unmarshal(*ccn.notify.Params, &results); err != nil {
ccn.err = &Error{
Code: E_INVALID_REQ,
Message: err.Error(),
Data: ccn.notify.Params,
}
return nil, ccn.err
}
return results, nil
}
return nil, ccn.err
}
func (ccn *ClientCodecNotify) Complete() {
releaseClientCodecNotify(ccn)
}
var clientCodecNotifyPool sync.Pool
func retainClientCodecNotify() *ClientCodecNotify {
v := clientCodecNotifyPool.Get()
if v == nil {
return &ClientCodecNotify{}
}
return v.(*ClientCodecNotify)
}
func releaseClientCodecNotify(ccn *ClientCodecNotify) {
ccn.notify.Version = ""
ccn.notify.Method = ""
ccn.notify.Params = nil
clientCodecNotifyPool.Put(ccn)
}

View File

@ -0,0 +1,51 @@
package json
import "sync"
// ----------------------------------------------------------------------------
// Request and Response
// ----------------------------------------------------------------------------
// clientRequest represents a JSON-RPC request sent by a client.
type clientRequest struct {
// JSON-RPC protocol.
Version string `json:"jsonrpc"`
// A String containing the name of the method to be invoked.
Method string `json:"method"`
// Object to pass as request parameter to the method.
Params interface{} `json:"params"`
// The request id. This can be of any type. It is used to match the
// response with the request that it is replying to.
ID interface{} `json:"id"`
}
var clientRequestPool sync.Pool
func retainClientRequest(method string, params interface{}, id interface{}) *clientRequest {
var cr *clientRequest
v := clientRequestPool.Get()
if v == nil {
cr = &clientRequest{}
} else {
cr = v.(*clientRequest)
}
cr.Version = Version
cr.Method = method
cr.Params = params
cr.ID = id
return cr
}
func releaseClientRequest(cr *clientRequest) {
cr.Version = ""
cr.Method = ""
cr.Params = nil
cr.ID = nil
clientRequestPool.Put(cr)
}

View File

@ -5,53 +5,53 @@ import (
"fmt" "fmt"
"sync" "sync"
"git.loafle.net/commons_go/rpc/codec"
"git.loafle.net/commons_go/rpc/protocol" "git.loafle.net/commons_go/rpc/protocol"
) )
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// ClientCodecResponse // ClientResponseCodec
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// clientResponse represents a JSON-RPC response returned to a client.
type clientResponse struct {
Result *json.RawMessage `json:"result,omitempty"`
Error error `json:"error,omitempty"`
ID interface{} `json:"id,omitempty"`
}
// newClientCodecResponse returns a new ClientCodecResponse. // newClientResponseCodec returns a new ClientResponseCodec.
func newClientCodecResponse(raw json.RawMessage) (protocol.ClientCodecResponse, error) { func newClientResponseCodec(raw *json.RawMessage, codec codec.Codec) (protocol.ClientResponseCodec, error) {
// Decode the request body and check if RPC method is valid. // Decode the request body and check if RPC method is valid.
ccr := retainClientCodecResponse() ccr := retainClientResponseCodec()
err := json.Unmarshal(raw, &ccr.response)
err := json.Unmarshal(*raw, &ccr.res)
if err != nil { if err != nil {
releaseClientCodecResponse(ccr) releaseClientResponseCodec(ccr)
return nil, err return nil, err
} }
if nil == ccr.response.Result && nil == ccr.response.Error { if nil == ccr.res.Result && nil == ccr.res.Error {
releaseClientCodecResponse(ccr) releaseClientResponseCodec(ccr)
return nil, fmt.Errorf("This is not Response") return nil, fmt.Errorf("This is not Response")
} }
if ccr.response.Version != Version {
ccr.err = &Error{
Code: E_INVALID_REQ,
Message: "jsonrpc must be " + Version,
Data: ccr.response,
}
}
return ccr, nil return ccr, nil
} }
// ClientCodecResponse decodes and encodes a single request. // ClientResponseCodec decodes and encodes a single request.
type ClientCodecResponse struct { type ClientResponseCodec struct {
response clientResponse res clientResponse
err error err error
} }
func (ccr *ClientCodecResponse) ID() interface{} { func (ccr *ClientResponseCodec) ID() interface{} {
return ccr.response.ID return ccr.res.ID
} }
func (ccr *ClientCodecResponse) Result(result interface{}) error { func (ccr *ClientResponseCodec) Result(result interface{}) error {
if ccr.err == nil && ccr.response.Result != nil { if ccr.err == nil && ccr.res.Result != nil {
if err := json.Unmarshal(*ccr.response.Result, &result); err != nil { if err := json.Unmarshal(*ccr.res.Result, &result); err != nil {
params := [1]interface{}{result} params := [1]interface{}{result}
if err = json.Unmarshal(*ccr.response.Result, &params); err != nil { if err = json.Unmarshal(*ccr.res.Result, &params); err != nil {
ccr.err = err ccr.err = err
} }
} }
@ -59,29 +59,28 @@ func (ccr *ClientCodecResponse) Result(result interface{}) error {
return ccr.err return ccr.err
} }
func (ccr *ClientCodecResponse) Error() error { func (ccr *ClientResponseCodec) Error() error {
return ccr.response.Error return ccr.res.Error
} }
func (ccr *ClientCodecResponse) Complete() { func (ccr *ClientResponseCodec) Close() {
releaseClientCodecResponse(ccr) releaseClientResponseCodec(ccr)
} }
var clientCodecResponsePool sync.Pool var clientResponseCodecPool sync.Pool
func retainClientCodecResponse() *ClientCodecResponse { func retainClientResponseCodec() *ClientResponseCodec {
v := clientCodecResponsePool.Get() v := clientResponseCodecPool.Get()
if v == nil { if v == nil {
return &ClientCodecResponse{} return &ClientResponseCodec{}
} }
return v.(*ClientCodecResponse) return v.(*ClientResponseCodec)
} }
func releaseClientCodecResponse(ccr *ClientCodecResponse) { func releaseClientResponseCodec(ccr *ClientResponseCodec) {
ccr.response.Version = "" ccr.res.Result = nil
ccr.response.Result = nil ccr.res.Error = nil
ccr.response.Error = nil ccr.res.ID = 0
ccr.response.ID = 0
clientCodecResponsePool.Put(ccr) clientResponseCodecPool.Put(ccr)
} }

View File

@ -5,50 +5,17 @@ import (
"io" "io"
"sync" "sync"
"git.loafle.net/commons_go/rpc/encode" "git.loafle.net/commons_go/rpc/codec"
"git.loafle.net/commons_go/rpc/protocol" "git.loafle.net/commons_go/rpc/protocol"
) )
var null = json.RawMessage([]byte("null")) var null = json.RawMessage([]byte("null"))
// ---------------------------------------------------------------------------- type serverMessage struct {
// Request and Response
// ----------------------------------------------------------------------------
// serverRequest represents a JSON-RPC request received by the server.
type serverRequest struct {
// JSON-RPC protocol. // JSON-RPC protocol.
Version string `json:"jsonrpc"` Version string `json:"jsonrpc"`
MessageType protocol.MessageType `json:"messageType"`
// A String containing the name of the method to be invoked. Message interface{} `json:"message"`
Method string `json:"method"`
// A Structured value to pass as arguments to the method.
Params *json.RawMessage `json:"params,omitempty"`
// The request id. MUST be a string, number or null.
// Our implementation will not do type checking for id.
// It will be copied as it is.
ID *json.RawMessage `json:"id,omitempty"`
}
// serverResponse represents a JSON-RPC response returned by the server.
type serverResponse struct {
// JSON-RPC protocol.
Version string `json:"jsonrpc"`
// The Object that was returned by the invoked method. This must be null
// in case there was an error invoking the method.
// As per spec the member will be omitted if there was an error.
Result interface{} `json:"result,omitempty"`
// An Error object if there was an error invoking the method. It must be
// null if there was no error.
// As per spec the member will be omitted if there was no error.
Error *Error `json:"error,omitempty"`
// This must be the same id as the request it is responding to.
ID *json.RawMessage `json:"id,omitempty"`
} }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
@ -56,279 +23,65 @@ type serverResponse struct {
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// NewCustomServerCodec returns a new JSON Codec based on passed encoder selector. // NewCustomServerCodec returns a new JSON Codec based on passed encoder selector.
func NewCustomServerCodec(encSel encode.EncoderSelector) *ServerCodec { func NewCustomServerCodec(codecSel codec.CodecSelector) protocol.ServerCodec {
return &ServerCodec{encSel: encSel} return &ServerCodec{codecSel: codecSel}
} }
// NewServerCodec returns a new JSON Codec. // NewServerCodec returns a new JSON Codec.
func NewServerCodec() *ServerCodec { func NewServerCodec() protocol.ServerCodec {
return NewCustomServerCodec(encode.DefaultEncoderSelector) return NewCustomServerCodec(codec.DefaultCodecSelector)
} }
// ServerCodec creates a ServerCodecRequest to process each request. // ServerCodec creates a ServerRequestCodec to process each request.
type ServerCodec struct { type ServerCodec struct {
encSel encode.EncoderSelector codecSel codec.CodecSelector
notifyMtx sync.Mutex
notify clientRequest
} }
// NewRequest returns a ServerCodecRequest. // NewRequest returns a ServerRequestCodec.
func (sc *ServerCodec) NewRequest(r io.Reader) (protocol.ServerCodecRequest, error) { func (sc *ServerCodec) NewRequest(r io.Reader) (protocol.ServerRequestCodec, error) {
return newServerCodecRequest(r, sc.encSel.SelectByReader(r)) return newServerRequestCodec(r, sc.codecSel.SelectByReader(r))
} }
// WriteNotify send a notification from server to client. // WriteNotification send a notification from server to client.
func (sc *ServerCodec) WriteNotify(w io.Writer, method string, args interface{}) error { func (sc *ServerCodec) WriteNotification(w io.Writer, method string, args interface{}) error {
sc.notifyMtx.Lock() noti := retainServerNotification(method, args)
defer func() {
sc.notify.Version = Version releaseServerNotification(noti)
sc.notify.Method = method }()
sc.notify.Params = args msg := retainServerMessage(protocol.MessageTypeNotification, noti)
defer func() {
encoder := json.NewEncoder(sc.encSel.SelectByWriter(w).Encode(w)) releaseServerMessage(msg)
err := encoder.Encode(&sc.notify) }()
sc.notifyMtx.Unlock() encoder := json.NewEncoder(sc.codecSel.SelectByWriter(w).Encode(w))
// Not sure in which case will this happen. But seems harmless. // Not sure in which case will this happen. But seems harmless.
if err != nil { if err := encoder.Encode(msg); nil != err {
return err return err
} }
return nil return nil
} }
// ---------------------------------------------------------------------------- var serverMessagePool sync.Pool
// ServerCodecRequest
// ----------------------------------------------------------------------------
// newCodecRequest returns a new ServerCodecRequest. func retainServerMessage(msgType protocol.MessageType, msg interface{}) *serverMessage {
func newServerCodecRequest(r io.Reader, encoder encode.Encoder) (protocol.ServerCodecRequest, error) { var sm *serverMessage
// Decode the request body and check if RPC method is valid. v := serverMessagePool.Get()
req := retainServerRequest()
err := json.NewDecoder(r).Decode(req)
if err == io.ErrUnexpectedEOF || err == io.EOF {
return nil, err
}
if err != nil {
err = &Error{
Code: E_PARSE,
Message: err.Error(),
Data: req,
}
}
if req.Version != Version {
err = &Error{
Code: E_INVALID_REQ,
Message: "jsonrpc must be " + Version,
Data: req,
}
}
return retainServerCodecRequest(req, err, encoder), nil
}
// CodecRequest decodes and encodes a single request.
type ServerCodecRequest struct {
request *serverRequest
err error
encoder encode.Encoder
}
// Complete is callback function that end of request.
func (scr *ServerCodecRequest) Complete() {
if nil != scr.request {
releaseServerRequest(scr.request)
}
releaseServerCodecRequest(scr)
}
// Method returns the RPC method for the current request.
//
// The method uses a dotted notation as in "Service.Method".
func (scr *ServerCodecRequest) Method() string {
return scr.request.Method
}
// ReadRequest fills the request object for the RPC method.
//
// ReadRequest parses request parameters in two supported forms in
// accordance with http://www.jsonrpc.org/specification#parameter_structures
//
// by-position: params MUST be an Array, containing the
// values in the Server expected order.
//
// by-name: params MUST be an Object, with member names
// that match the Server expected parameter names. The
// absence of expected names MAY result in an error being
// generated. The names MUST match exactly, including
// case, to the method's expected parameters.
func (scr *ServerCodecRequest) ReadParams(args []interface{}) error {
if scr.err == nil && scr.request.Params != nil {
// Note: if scr.request.Params is nil it's not an error, it's an optional member.
// JSON params structured object. Unmarshal to the args object.
// if err := json.Unmarshal(*scr.request.Params, &args); err != nil {
// // Clearly JSON params is not a structured object,
// // fallback and attempt an unmarshal with JSON params as
// // array value and RPC params is struct. Unmarshal into
// // array containing the request struct.
// scr.err = &Error{
// Code: E_INVALID_REQ,
// Message: err.Error(),
// Data: scr.request.Params,
// }
// }
raws := make([]json.RawMessage, len(args))
if err := json.Unmarshal(*scr.request.Params, &raws); err != nil {
scr.err = &Error{
Code: E_INVALID_REQ,
Message: err.Error(),
Data: scr.request.Params,
}
return scr.err
}
for indexI := 0; indexI < len(args); indexI++ {
raw := raws[indexI]
arg := args[indexI]
if err := json.Unmarshal(raw, &arg); err != nil {
scr.err = &Error{
Code: E_INVALID_REQ,
Message: err.Error(),
Data: scr.request.Params,
}
return scr.err
}
}
}
return scr.err
}
func (scr *ServerCodecRequest) Params() ([]string, error) {
if scr.err == nil && scr.request.Params != nil {
var results []string
if err := json.Unmarshal(*scr.request.Params, &results); err != nil {
scr.err = &Error{
Code: E_INVALID_REQ,
Message: err.Error(),
Data: scr.request.Params,
}
return nil, scr.err
}
return results, nil
}
return nil, scr.err
}
// WriteResponse encodes the response and writes it to the ResponseWriter.
func (scr *ServerCodecRequest) WriteResponse(w io.Writer, reply interface{}) error {
res := retainServerResponse(Version, reply, nil, scr.request.ID)
return scr.writeServerResponse(w, res)
}
// WriteError encodes the response and writes it to the ResponseWriter.
func (scr *ServerCodecRequest) WriteError(w io.Writer, status int, err error) error {
jsonErr, ok := err.(*Error)
if !ok {
jsonErr = &Error{
Code: E_SERVER,
Message: err.Error(),
}
}
res := retainServerResponse(Version, nil, jsonErr, scr.request.ID)
return scr.writeServerResponse(w, res)
}
func (scr *ServerCodecRequest) writeServerResponse(w io.Writer, res *serverResponse) error {
defer func() {
if nil != res {
releaseServerResponse(res)
}
}()
// ID is null for notifications and they don't have a response.
if scr.request.ID != nil {
encoder := json.NewEncoder(scr.encoder.Encode(w))
err := encoder.Encode(res)
// Not sure in which case will this happen. But seems harmless.
if err != nil {
return err
}
}
return nil
}
type EmptyResponse struct {
}
var serverCodecRequestPool sync.Pool
func retainServerCodecRequest(request *serverRequest, err error, encoder encode.Encoder) *ServerCodecRequest {
var scr *ServerCodecRequest
v := serverCodecRequestPool.Get()
if v == nil { if v == nil {
scr = &ServerCodecRequest{} sm = &serverMessage{}
} else { } else {
scr = v.(*ServerCodecRequest) sm = v.(*serverMessage)
} }
scr.request = request sm.Version = Version
scr.err = err sm.MessageType = msgType
scr.encoder = encoder sm.Message = msg
return scr return sm
} }
func releaseServerCodecRequest(scr *ServerCodecRequest) { func releaseServerMessage(sm *serverMessage) {
scr.request = nil sm.Version = ""
scr.err = nil sm.MessageType = protocol.MessageTypeUnknown
scr.encoder = nil sm.Message = nil
serverCodecRequestPool.Put(scr) serverMessagePool.Put(sm)
}
var serverRequestPool sync.Pool
func retainServerRequest() *serverRequest {
v := serverRequestPool.Get()
if v == nil {
return &serverRequest{}
}
return v.(*serverRequest)
}
func releaseServerRequest(sr *serverRequest) {
sr.Method = ""
sr.Params = nil
sr.ID = nil
serverRequestPool.Put(sr)
}
var serverResponsePool sync.Pool
func retainServerResponse(version string, result interface{}, err *Error, id *json.RawMessage) *serverResponse {
var sr *serverResponse
v := serverResponsePool.Get()
if v == nil {
sr = &serverResponse{}
} else {
sr = v.(*serverResponse)
}
sr.Version = version
sr.Result = result
sr.Error = err
sr.ID = id
return sr
}
func releaseServerResponse(sr *serverResponse) {
sr.Version = ""
sr.Result = nil
sr.Error = nil
sr.ID = nil
serverResponsePool.Put(sr)
} }

View File

@ -0,0 +1,38 @@
package json
import (
"sync"
)
// clientRequest represents a JSON-RPC request sent by a client.
type serverNotification struct {
// A String containing the name of the method to be invoked.
Method string `json:"method"`
// Object to pass as request parameter to the method.
Params interface{} `json:"params"`
}
var serverNotificationPool sync.Pool
func retainServerNotification(method string, args interface{}) *serverNotification {
var sn *serverNotification
v := serverNotificationPool.Get()
if v == nil {
sn = &serverNotification{}
} else {
sn = v.(*serverNotification)
}
sn.Method = method
sn.Params = args
return sn
}
func releaseServerNotification(sn *serverNotification) {
sn.Method = ""
sn.Params = nil
serverNotificationPool.Put(sn)
}

View File

@ -0,0 +1,238 @@
package json
import (
"encoding/json"
"io"
"sync"
"git.loafle.net/commons_go/rpc/codec"
"git.loafle.net/commons_go/rpc/protocol"
)
// ----------------------------------------------------------------------------
// Request
// ----------------------------------------------------------------------------
// serverRequest represents a JSON-RPC request received by the server.
type serverRequest struct {
// JSON-RPC protocol.
Version string `json:"jsonrpc"`
// A String containing the name of the method to be invoked.
Method string `json:"method"`
// A Structured value to pass as arguments to the method.
Params *json.RawMessage `json:"params,omitempty"`
// The request id. MUST be a string, number or null.
// Our implementation will not do type checking for id.
// It will be copied as it is.
ID *json.RawMessage `json:"id,omitempty"`
}
// ----------------------------------------------------------------------------
// ServerRequestCodec
// ----------------------------------------------------------------------------
// newRequestCodec returns a new ServerRequestCodec.
func newServerRequestCodec(r io.Reader, codec codec.Codec) (protocol.ServerRequestCodec, error) {
// Decode the request body and check if RPC method is valid.
req := retainServerRequest()
err := json.NewDecoder(r).Decode(req)
if err == io.ErrUnexpectedEOF || err == io.EOF {
return nil, err
}
if err != nil {
err = &Error{
Code: E_PARSE,
Message: err.Error(),
Data: req,
}
}
if req.Version != Version {
err = &Error{
Code: E_INVALID_REQ,
Message: "jsonrpc must be " + Version,
Data: req,
}
}
return retainServerRequestCodec(req, err, codec), nil
}
// ServerRequestCodec decodes and encodes a single request.
type ServerRequestCodec struct {
req *serverRequest
err error
codec codec.Codec
}
// Close is callback function that end of request.
func (src *ServerRequestCodec) Close() {
if nil != src.req {
releaseServerRequest(src.req)
}
releaseServerRequestCodec(src)
}
// Method returns the RPC method for the current request.
//
// The method uses a dotted notation as in "Service.Method".
func (src *ServerRequestCodec) Method() string {
return src.req.Method
}
// ReadRequest fills the request object for the RPC method.
//
// ReadRequest parses request parameters in two supported forms in
// accordance with http://www.jsonrpc.org/specification#parameter_structures
//
// by-position: params MUST be an Array, containing the
// values in the Server expected order.
//
// by-name: params MUST be an Object, with member names
// that match the Server expected parameter names. The
// absence of expected names MAY result in an error being
// generated. The names MUST match exactly, including
// case, to the method's expected parameters.
func (src *ServerRequestCodec) ReadParams(args []interface{}) error {
if src.err == nil && src.req.Params != nil {
// Note: if src.req.Params is nil it's not an error, it's an optional member.
// JSON params structured object. Unmarshal to the args object.
// if err := json.Unmarshal(*src.req.Params, &args); err != nil {
// // Clearly JSON params is not a structured object,
// // fallback and attempt an unmarshal with JSON params as
// // array value and RPC params is struct. Unmarshal into
// // array containing the request struct.
// src.err = &Error{
// Code: E_INVALID_REQ,
// Message: err.Error(),
// Data: src.req.Params,
// }
// }
raws := make([]json.RawMessage, len(args))
if err := json.Unmarshal(*src.req.Params, &raws); err != nil {
src.err = &Error{
Code: E_INVALID_REQ,
Message: err.Error(),
Data: src.req.Params,
}
return src.err
}
for indexI := 0; indexI < len(args); indexI++ {
raw := raws[indexI]
arg := args[indexI]
if err := json.Unmarshal(raw, &arg); err != nil {
src.err = &Error{
Code: E_INVALID_REQ,
Message: err.Error(),
Data: src.req.Params,
}
return src.err
}
}
}
return src.err
}
func (src *ServerRequestCodec) Params() ([]string, error) {
if src.err == nil && src.req.Params != nil {
var results []string
if err := json.Unmarshal(*src.req.Params, &results); err != nil {
src.err = &Error{
Code: E_INVALID_REQ,
Message: err.Error(),
Data: src.req.Params,
}
return nil, src.err
}
return results, nil
}
return nil, src.err
}
// WriteResponse encodes the response and writes it to the ResponseWriter.
func (src *ServerRequestCodec) WriteResponse(w io.Writer, reply interface{}) error {
res := retainServerResponse(reply, nil, src.req.ID)
return src.writeServerResponse(w, res)
}
// WriteError encodes the response and writes it to the ResponseWriter.
func (src *ServerRequestCodec) WriteError(w io.Writer, status int, err error) error {
jsonErr, ok := err.(*Error)
if !ok {
jsonErr = &Error{
Code: E_SERVER,
Message: err.Error(),
}
}
res := retainServerResponse(nil, jsonErr, src.req.ID)
return src.writeServerResponse(w, res)
}
func (src *ServerRequestCodec) writeServerResponse(w io.Writer, res *serverResponse) error {
defer func() {
releaseServerResponse(res)
}()
// ID is null for notifications and they don't have a response.
if src.req.ID != nil {
msg := retainServerMessage(protocol.MessageTypeResponse, res)
defer func() {
releaseServerMessage(msg)
}()
encoder := json.NewEncoder(src.codec.Encode(w))
// Not sure in which case will this happen. But seems harmless.
if err := encoder.Encode(msg); nil != err {
return err
}
}
return nil
}
var serverRequestCodecPool sync.Pool
func retainServerRequestCodec(req *serverRequest, err error, codec codec.Codec) *ServerRequestCodec {
var src *ServerRequestCodec
v := serverRequestCodecPool.Get()
if v == nil {
src = &ServerRequestCodec{}
} else {
src = v.(*ServerRequestCodec)
}
src.req = req
src.err = err
src.codec = codec
return src
}
func releaseServerRequestCodec(src *ServerRequestCodec) {
src.req = nil
src.err = nil
src.codec = nil
serverRequestCodecPool.Put(src)
}
var serverRequestPool sync.Pool
func retainServerRequest() *serverRequest {
v := serverRequestPool.Get()
if v == nil {
return &serverRequest{}
}
return v.(*serverRequest)
}
func releaseServerRequest(sr *serverRequest) {
sr.Method = ""
sr.Params = nil
sr.ID = nil
serverRequestPool.Put(sr)
}

View File

@ -0,0 +1,52 @@
package json
import (
"encoding/json"
"sync"
)
// ----------------------------------------------------------------------------
// Response
// ----------------------------------------------------------------------------
// serverResponse represents a JSON-RPC response returned by the server.
type serverResponse struct {
// The Object that was returned by the invoked method. This must be null
// in case there was an error invoking the method.
// As per spec the member will be omitted if there was an error.
Result interface{} `json:"result,omitempty"`
// An Error object if there was an error invoking the method. It must be
// null if there was no error.
// As per spec the member will be omitted if there was no error.
Error *Error `json:"error,omitempty"`
// This must be the same id as the request it is responding to.
ID *json.RawMessage `json:"id,omitempty"`
}
var serverResponsePool sync.Pool
func retainServerResponse(result interface{}, err *Error, id *json.RawMessage) *serverResponse {
var sr *serverResponse
v := serverResponsePool.Get()
if v == nil {
sr = &serverResponse{}
} else {
sr = v.(*serverResponse)
}
sr.Result = result
sr.Error = err
sr.ID = id
return sr
}
func releaseServerResponse(sr *serverResponse) {
sr.Result = nil
sr.Error = nil
sr.ID = nil
serverResponsePool.Put(sr)
}

View File

@ -10,5 +10,5 @@ type RegistryCodec interface {
// Reads the request filling the RPC method args. // Reads the request filling the RPC method args.
ReadParams(args []interface{}) error ReadParams(args []interface{}) error
Params() ([]string, error) Params() ([]string, error)
Complete() Close()
} }

View File

@ -4,14 +4,15 @@ import (
"io" "io"
) )
// ServerCodec creates a ServerCodecRequest to process each request. // ServerCodec creates a ServerRequestCodec to process each request.
type ServerCodec interface { type ServerCodec interface {
NewRequest(r io.Reader) (ServerCodecRequest, error) NewRequest(r io.Reader) (ServerRequestCodec, error)
WriteNotification(w io.Writer, method string, args interface{}) error
} }
// ServerCodecRequest decodes a request and encodes a response using a specific // ServerRequestCodec decodes a request and encodes a response using a specific
// serialization scheme. // serialization scheme.
type ServerCodecRequest interface { type ServerRequestCodec interface {
RegistryCodec RegistryCodec
WriteResponse(w io.Writer, reply interface{}) error WriteResponse(w io.Writer, reply interface{}) error
WriteError(w io.Writer, status int, err error) error WriteError(w io.Writer, status int, err error) error

View File

@ -1,14 +0,0 @@
package server
import (
"io"
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol"
)
func Handle(sh ServerHandler, codec protocol.ServerCodec, r io.Reader, w io.Writer) error {
return rpc.Handle(sh, codec, r, w, func(codecReq protocol.ServerCodecRequest) (result interface{}, err error) {
return sh.Invoke(codecReq)
})
}

View File

@ -1,12 +0,0 @@
package server
import (
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol"
)
type ServerHandler interface {
rpc.ServerHandler
Invoke(codec protocol.RegistryCodec) (result interface{}, err error)
}

View File

@ -1,23 +0,0 @@
package server
import (
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol"
)
type ServerHandlers struct {
rpc.ServerHandlers
RPCRegistry rpc.Registry
}
func (sh *ServerHandlers) Invoke(codec protocol.RegistryCodec) (result interface{}, err error) {
return sh.RPCRegistry.Invoke(codec)
}
func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate()
if nil == sh.RPCRegistry {
panic("RPCRegistry must be specified.")
}
}

View File

@ -0,0 +1,7 @@
package server
import "git.loafle.net/commons_go/rpc"
type ServletHandler interface {
rpc.ServletHandler
}

View File

@ -0,0 +1,36 @@
package server
import (
"fmt"
"git.loafle.net/commons_go/logging"
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol"
)
type ServletHandlers struct {
rpc.ServletHandlers
RPCRegistry rpc.Registry
}
func (sh *ServletHandlers) Invoke(requestCodec protocol.RegistryCodec) (result interface{}, err error) {
if !sh.RPCRegistry.HasMethod(requestCodec.Method()) {
return nil, fmt.Errorf("RPC Servlet Handler: Method[%s] is not exist", requestCodec.Method())
}
result, err = sh.RPCRegistry.Invoke(requestCodec)
if nil != err {
}
return
}
func (sh *ServletHandlers) Validate() {
sh.ServletHandlers.Validate()
if nil == sh.RPCRegistry {
logging.Logger().Panic(fmt.Sprintf("RPC Servlet Handler: RPC Registry must be specified"))
}
}

View File

@ -1,28 +0,0 @@
package rpc
import (
"io"
"git.loafle.net/commons_go/rpc/protocol"
)
type ServerHandler interface {
Init() error
OnStart()
OnPreRead(r io.Reader)
OnPostRead(r io.Reader)
OnPreWriteResult(w io.Writer, result interface{})
OnPostWriteResult(w io.Writer, result interface{})
OnPreWriteError(w io.Writer, err error)
OnPostWriteError(w io.Writer, err error)
OnStop()
Validate()
RegisterCodec(codec protocol.ServerCodec, contentType string)
GetCodec(contentType string) (protocol.ServerCodec, error)
}

View File

@ -1,86 +0,0 @@
package rpc
import (
"fmt"
"io"
"strings"
"git.loafle.net/commons_go/rpc/protocol"
)
func NewServerHandler() ServerHandler {
sh := &ServerHandlers{}
return sh
}
type ServerHandlers struct {
codecs map[string]protocol.ServerCodec
}
func (sh *ServerHandlers) Init() error {
return nil
}
func (sh *ServerHandlers) OnStart() {
// no op
}
func (sh *ServerHandlers) OnPreRead(r io.Reader) {
// no op
}
func (sh *ServerHandlers) OnPostRead(r io.Reader) {
// no op
}
func (sh *ServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) {
// no op
}
func (sh *ServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) {
// no op
}
func (sh *ServerHandlers) OnPreWriteError(w io.Writer, err error) {
// no op
}
func (sh *ServerHandlers) OnPostWriteError(w io.Writer, err error) {
// no op
}
func (sh *ServerHandlers) OnStop() {
// no op
}
func (sh *ServerHandlers) Validate() {
}
// RegisterCodec adds a new codec to the server.
//
// Codecs are defined to process a given serialization scheme, e.g., JSON or
// XML. A codec is chosen based on the "Content-Type" header from the request,
// excluding the charset definition.
func (sh *ServerHandlers) RegisterCodec(codec protocol.ServerCodec, contentType string) {
if nil == sh.codecs {
sh.codecs = make(map[string]protocol.ServerCodec)
}
sh.codecs[strings.ToLower(contentType)] = codec
}
func (sh *ServerHandlers) GetCodec(contentType string) (protocol.ServerCodec, error) {
var codec protocol.ServerCodec
if contentType == "" && len(sh.codecs) == 1 {
// If Content-Type is not set and only one codec has been registered,
// then default to that codec.
for _, c := range sh.codecs {
codec = c
}
} else if codec = sh.codecs[strings.ToLower(contentType)]; codec == nil {
return nil, fmt.Errorf("Unrecognized Content-Type: %s", contentType)
}
return codec, nil
}

208
servlet.go Normal file
View File

@ -0,0 +1,208 @@
package rpc
import (
"fmt"
"sync"
"git.loafle.net/commons_go/logging"
"git.loafle.net/commons_go/rpc/protocol"
)
func NewServlet(sh ServletHandler) Servlet {
return &servlet{
sh: sh,
}
}
type Servlet interface {
Start(contentType string, reader interface{}, writer interface{}) error
Stop()
Send(method string, args ...interface{}) (err error)
}
type servlet struct {
sh ServletHandler
contentType string
reader interface{}
writer interface{}
serverCodec protocol.ServerCodec
messageQueueChan chan *messageState
stopChan chan struct{}
stopWg sync.WaitGroup
}
func (s *servlet) Start(contentType string, reader interface{}, writer interface{}) error {
if nil == s.sh {
panic("Servlet: servlet handler must be specified.")
}
s.sh.Validate()
if s.stopChan != nil {
panic("Servlet: servlet is already running. Stop it before starting it again")
}
sc, err := s.sh.getCodec(contentType)
if nil != err {
return err
}
s.contentType = contentType
s.reader = reader
s.writer = writer
s.serverCodec = sc
if err := s.sh.Init(); nil != err {
logging.Logger().Panic(fmt.Sprintf("Servlet: Initialization of servlet has been failed %v", err))
}
s.stopChan = make(chan struct{})
s.messageQueueChan = make(chan *messageState, s.sh.GetPendingMessages())
s.stopWg.Add(1)
go handleServlet(s)
return nil
}
func (s *servlet) Stop() {
if s.stopChan == nil {
panic("Server: server must be started before stopping it")
}
close(s.stopChan)
s.stopWg.Wait()
s.stopChan = nil
s.sh.Destroy()
s.contentType = ""
s.reader = nil
s.writer = nil
s.serverCodec = nil
logging.Logger().Info(fmt.Sprintf("Servlet is stopped"))
}
func (s *servlet) Send(method string, args ...interface{}) (err error) {
ms := retainMessageState(protocol.MessageTypeNotification)
ms.noti.method = method
ms.noti.args = args
s.messageQueueChan <- ms
return nil
}
func handleServlet(s *servlet) {
defer s.stopWg.Done()
s.stopWg.Add(1)
go handleMessage(s)
for {
requestCodec, err := s.sh.GetRequest(s.serverCodec, s.reader)
if nil != err {
continue
}
s.stopWg.Add(1)
go handleRequest(s, requestCodec)
select {
case <-s.stopChan:
default:
}
}
}
func handleRequest(s *servlet, requestCodec protocol.ServerRequestCodec) {
defer func() {
s.stopWg.Done()
}()
result, err := s.sh.Invoke(requestCodec)
ms := retainMessageState(protocol.MessageTypeResponse)
ms.res.requestCodec = requestCodec
ms.res.result = result
ms.res.err = err
s.messageQueueChan <- ms
}
func handleMessage(s *servlet) {
defer func() {
s.stopWg.Done()
}()
for {
select {
case ms := <-s.messageQueueChan:
switch ms.messageType {
case protocol.MessageTypeResponse:
if err := s.sh.SendResponse(ms.res.requestCodec, s.writer, ms.res.result, ms.res.err); nil != err {
logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err))
}
ms.res.requestCodec.Close()
case protocol.MessageTypeNotification:
if err := s.sh.SendNotification(s.serverCodec, s.writer, ms.noti.method, ms.noti.args...); nil != err {
logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err))
}
default:
}
releaseMessageState(ms)
case <-s.stopChan:
return
}
}
}
type messageState struct {
messageType protocol.MessageType
res messageResponse
noti messageNotification
}
type messageResponse struct {
requestCodec protocol.ServerRequestCodec
result interface{}
err error
}
type messageNotification struct {
method string
args []interface{}
}
var messageStatePool sync.Pool
func retainMessageState(messageType protocol.MessageType) *messageState {
var ms *messageState
v := messageStatePool.Get()
if v == nil {
ms = &messageState{}
} else {
ms = v.(*messageState)
}
ms.messageType = messageType
return ms
}
func releaseMessageState(ms *messageState) {
ms.messageType = protocol.MessageTypeUnknown
ms.res.requestCodec = nil
ms.res.result = nil
ms.res.err = nil
ms.noti.method = ""
ms.noti.args = nil
messageStatePool.Put(ms)
}

21
servlet_handler.go Normal file
View File

@ -0,0 +1,21 @@
package rpc
import "git.loafle.net/commons_go/rpc/protocol"
type ServletHandler interface {
Init() error
GetRequest(codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error)
Invoke(requestCodec protocol.RegistryCodec) (result interface{}, err error)
SendResponse(requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error
SendNotification(codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error
Destroy()
RegisterCodec(contentType string, codec protocol.ServerCodec)
getCodec(contentType string) (protocol.ServerCodec, error)
GetPendingMessages() int
Validate()
}

82
servlet_handlers.go Normal file
View File

@ -0,0 +1,82 @@
package rpc
import (
"fmt"
"strings"
"git.loafle.net/commons_go/rpc/protocol"
)
type ServletHandlers struct {
// The maximum number of pending messages in the queue.
//
// The number of pending requsts should exceed the expected number
// of concurrent goroutines calling client's methods.
// Otherwise a lot of ClientError.Overflow errors may appear.
//
// Default is DefaultPendingMessages.
PendingMessages int
codecs map[string]protocol.ServerCodec
}
func (sh *ServletHandlers) Init() error {
return nil
}
func (sh *ServletHandlers) GetRequest(codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error) {
return nil, fmt.Errorf("Servlet Handler: GetRequest is not implemented")
}
func (sh *ServletHandlers) Invoke(requestCodec protocol.RegistryCodec) (result interface{}, err error) {
return nil, fmt.Errorf("Servlet Handler: Invoke is not implemented")
}
func (sh *ServletHandlers) SendResponse(requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error {
return fmt.Errorf("Servlet Handler: SendResponse is not implemented")
}
func (sh *ServletHandlers) SendNotification(codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error {
return fmt.Errorf("Servlet Handler: SendNotification is not implemented")
}
func (sh *ServletHandlers) Destroy() {
// no op
}
// RegisterCodec adds a new codec to the server.
//
// Codecs are defined to process a given serialization scheme, e.g., JSON or
// XML. A codec is chosen based on the "Content-Type" header from the request,
// excluding the charset definition.
func (sh *ServletHandlers) RegisterCodec(contentType string, codec protocol.ServerCodec) {
if nil == sh.codecs {
sh.codecs = make(map[string]protocol.ServerCodec)
}
sh.codecs[strings.ToLower(contentType)] = codec
}
func (sh *ServletHandlers) getCodec(contentType string) (protocol.ServerCodec, error) {
var codec protocol.ServerCodec
if contentType == "" && len(sh.codecs) == 1 {
// If Content-Type is not set and only one codec has been registered,
// then default to that codec.
for _, c := range sh.codecs {
codec = c
}
} else if codec = sh.codecs[strings.ToLower(contentType)]; codec == nil {
return nil, fmt.Errorf("Unrecognized Content-Type: %s", contentType)
}
return codec, nil
}
func (sh *ServletHandlers) GetPendingMessages() int {
return sh.PendingMessages
}
func (sh *ServletHandlers) Validate() {
if 0 >= sh.PendingMessages {
sh.PendingMessages = DefaultPendingMessages
}
}

View File

@ -0,0 +1,7 @@
package socket
import "git.loafle.net/commons_go/rpc/server"
type ServletHandler interface {
server.ServletHandler
}

View File

@ -0,0 +1,45 @@
package socket
import (
"io"
"git.loafle.net/commons_go/rpc/protocol"
"git.loafle.net/commons_go/rpc/server"
)
type ServletHandlers struct {
server.ServletHandlers
}
func (sh *ServletHandlers) GetRequest(codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error) {
r := reader.(io.Reader)
requestCodec, err := codec.NewRequest(r)
return requestCodec, err
}
func (sh *ServletHandlers) SendResponse(requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error {
w := writer.(io.Writer)
if nil != err {
if lerr := requestCodec.WriteError(w, 500, err); nil != lerr {
}
} else {
if err := requestCodec.WriteResponse(w, result); nil != err {
}
}
return nil
}
func (sh *ServletHandlers) SendNotification(codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error {
w := writer.(io.Writer)
if err := codec.WriteNotification(w, method, args); nil != err {
}
return nil
}

View File

@ -0,0 +1,7 @@
package fasthttp
import "git.loafle.net/commons_go/rpc/server"
type ServletHandler interface {
server.ServletHandler
}

View File

@ -0,0 +1,58 @@
package fasthttp
import (
"fmt"
"git.loafle.net/commons_go/rpc/protocol"
"git.loafle.net/commons_go/rpc/server"
"git.loafle.net/commons_go/websocket_fasthttp/websocket"
)
type ServletHandlers struct {
server.ServletHandlers
}
func (sh *ServletHandlers) GetRequest(codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error) {
conn := reader.(*websocket.Conn)
_, r, err := conn.NextReader()
requestCodec, err := codec.NewRequest(r)
return requestCodec, err
}
func (sh *ServletHandlers) SendResponse(requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error {
conn := writer.(*websocket.Conn)
wc, lerr := conn.NextWriter(websocket.TextMessage)
if nil != lerr {
}
if nil != err {
if lerr := requestCodec.WriteError(wc, 500, err); nil != lerr {
}
} else {
if err := requestCodec.WriteResponse(wc, result); nil != err {
}
}
return fmt.Errorf("Servlet Handler: SendResponse is not implemented")
}
func (sh *ServletHandlers) SendNotification(codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error {
conn := writer.(*websocket.Conn)
wc, lerr := conn.NextWriter(websocket.TextMessage)
if nil != lerr {
}
if err := codec.WriteNotification(wc, method, args); nil != err {
}
return fmt.Errorf("Servlet Handler: SendNotification is not implemented")
}