From 2286a5021eec50b041a730bdd1d3e2f4fd203e08 Mon Sep 17 00:00:00 2001 From: crusader Date: Sun, 26 Nov 2017 19:15:51 +0900 Subject: [PATCH] ing --- .gitignore | 2 - client/client.go | 82 +++-- client/{call.go => request_state.go} | 41 +-- codec/codec.go | 23 ++ codec/selector.go | 25 ++ constants.go | 5 + encode/encode.go | 23 -- encode/selector.go | 25 -- gateway/handle.go | 19 - gateway/server_handler.go | 13 - gateway/server_handlers.go | 21 -- gateway/servlet_handler.go | 8 + gateway/servlet_handlers.go | 9 + handle.go | 39 --- notify/constants.go | 7 - notify/notify.go | 163 --------- notify/notify_handler.go | 12 - notify/notify_handlers.go | 32 -- protocol/client_codec.go | 53 +-- protocol/json/client.go | 224 +++++------- protocol/json/client_notification.go | 97 ++++++ protocol/json/client_notify.go | 103 ------ protocol/json/client_request.go | 51 +++ protocol/json/client_response.go | 81 +++-- protocol/json/server.go | 329 +++--------------- protocol/json/server_notification.go | 38 ++ protocol/json/server_request.go | 238 +++++++++++++ protocol/json/server_response.go | 52 +++ protocol/registry_codec.go | 2 +- protocol/server_codec.go | 9 +- server/handle.go | 14 - server/server_handler.go | 12 - server/server_handlers.go | 23 -- server/servlet_handler.go | 7 + server/servlet_handlers.go | 36 ++ server_handler.go | 28 -- server_handlers.go | 86 ----- servlet.go | 208 +++++++++++ servlet_handler.go | 21 ++ servlet_handlers.go | 82 +++++ transport/socket/servlet_handler.go | 7 + transport/socket/servlet_handlers.go | 45 +++ .../websocket/fasthttp/servlet_handler.go | 7 + .../websocket/fasthttp/servlet_handlers.go | 58 +++ 44 files changed, 1288 insertions(+), 1172 deletions(-) rename client/{call.go => request_state.go} (59%) create mode 100644 codec/codec.go create mode 100644 codec/selector.go create mode 100644 constants.go delete mode 100644 encode/encode.go delete mode 100644 encode/selector.go delete mode 100644 gateway/handle.go delete mode 100644 gateway/server_handler.go delete mode 100644 gateway/server_handlers.go create mode 100644 gateway/servlet_handler.go create mode 100644 gateway/servlet_handlers.go delete mode 100644 handle.go delete mode 100644 notify/constants.go delete mode 100644 notify/notify.go delete mode 100644 notify/notify_handler.go delete mode 100644 notify/notify_handlers.go create mode 100644 protocol/json/client_notification.go delete mode 100644 protocol/json/client_notify.go create mode 100644 protocol/json/client_request.go create mode 100644 protocol/json/server_notification.go create mode 100644 protocol/json/server_request.go create mode 100644 protocol/json/server_response.go delete mode 100644 server/handle.go delete mode 100644 server/server_handler.go delete mode 100644 server/server_handlers.go create mode 100644 server/servlet_handler.go create mode 100644 server/servlet_handlers.go delete mode 100644 server_handler.go delete mode 100644 server_handlers.go create mode 100644 servlet.go create mode 100644 servlet_handler.go create mode 100644 servlet_handlers.go create mode 100644 transport/socket/servlet_handler.go create mode 100644 transport/socket/servlet_handlers.go create mode 100644 transport/websocket/fasthttp/servlet_handler.go create mode 100644 transport/websocket/fasthttp/servlet_handlers.go diff --git a/.gitignore b/.gitignore index a5cf485..3733e36 100644 --- a/.gitignore +++ b/.gitignore @@ -66,5 +66,3 @@ glide.lock .DS_Store dist/ debug - - diff --git a/client/client.go b/client/client.go index 041b1d5..82cd311 100644 --- a/client/client.go +++ b/client/client.go @@ -26,7 +26,7 @@ type Client interface { Connect() error Close() - Notify(method string, args ...interface{}) (err error) + Send(method string, args ...interface{}) (err error) Call(result interface{}, method string, args ...interface{}) error CallTimeout(timeout time.Duration, result interface{}, method string, args ...interface{}) (err error) } @@ -37,10 +37,10 @@ type client struct { conn net.Conn pendingRequestsCount uint32 - pendingRequests map[uint64]*CallState + pendingRequests map[uint64]*RequestState pendingRequestsLock sync.Mutex - requestQueueChan chan *CallState + requestQueueChan chan *RequestState stopChan chan struct{} stopWg sync.WaitGroup @@ -58,8 +58,8 @@ func (c *client) Connect() error { return err } c.stopChan = make(chan struct{}) - c.requestQueueChan = make(chan *CallState, c.ch.GetPendingRequests()) - c.pendingRequests = make(map[uint64]*CallState) + c.requestQueueChan = make(chan *RequestState, c.ch.GetPendingRequests()) + c.pendingRequests = make(map[uint64]*RequestState) go c.handleRPC() @@ -75,8 +75,8 @@ func (c *client) Close() { c.stopChan = nil } -func (c *client) Notify(method string, args ...interface{}) (err error) { - var cs *CallState +func (c *client) Send(method string, args ...interface{}) (err error) { + var cs *RequestState if cs, err = c.send(true, false, nil, method, args...); nil != err { return } @@ -84,7 +84,7 @@ func (c *client) Notify(method string, args ...interface{}) (err error) { select { case <-cs.DoneChan: err = cs.Error - ReleaseCallState(cs) + releaseCallState(cs) } return @@ -95,7 +95,7 @@ func (c *client) Call(result interface{}, method string, args ...interface{}) er } func (c *client) CallTimeout(timeout time.Duration, result interface{}, method string, args ...interface{}) (err error) { - var cs *CallState + var cs *RequestState if cs, err = c.send(true, true, result, method, args...); nil != err { return } @@ -105,7 +105,7 @@ func (c *client) CallTimeout(timeout time.Duration, result interface{}, method s select { case <-cs.DoneChan: result, err = cs.Result, cs.Error - ReleaseCallState(cs) + releaseCallState(cs) case <-t.C: cs.Cancel() err = getClientTimeoutError(c, timeout) @@ -116,21 +116,21 @@ func (c *client) CallTimeout(timeout time.Duration, result interface{}, method s return } -func (c *client) send(usePool bool, hasResponse bool, result interface{}, method string, args ...interface{}) (cs *CallState, err error) { +func (c *client) send(usePool bool, hasResponse bool, result interface{}, method string, args ...interface{}) (cs *RequestState, err error) { if !hasResponse { usePool = true } if usePool { - cs = RetainCallState() + cs = retainRequestState() } else { - cs = &CallState{} + cs = &RequestState{} } cs.hasResponse = hasResponse cs.Method = method cs.Args = args - cs.DoneChan = make(chan *CallState, 1) + cs.DoneChan = make(chan *RequestState, 1) if hasResponse { cs.ID = c.ch.GetRequestID() @@ -149,7 +149,7 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method // Immediately notify the caller not interested // in the response on requests' queue overflow, since // there are no other ways to notify it later. - ReleaseCallState(cs) + releaseCallState(cs) return nil, getClientOverflowError(c) } @@ -160,7 +160,7 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method //close(rcs.DoneChan) rcs.Done() } else { - ReleaseCallState(rcs) + releaseCallState(rcs) } default: } @@ -171,7 +171,7 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method default: // Release m even if usePool = true, since m wasn't exposed // to the caller yet. - ReleaseCallState(cs) + releaseCallState(cs) return nil, getClientOverflowError(c) } } @@ -229,7 +229,7 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) { }() for { - var cs *CallState + var cs *RequestState select { case cs = <-c.requestQueueChan: @@ -250,7 +250,7 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) { // close(m.done) cs.Done() } else { - ReleaseCallState(cs) + releaseCallState(cs) } continue } @@ -268,11 +268,12 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) { continue } } + var requestID interface{} if 0 < cs.ID { requestID = cs.ID } - err = c.ch.GetCodec().Write(c.conn, cs.Method, cs.Args, requestID) + err = c.ch.GetCodec().WriteRequest(c.conn, cs.Method, cs.Args, requestID) if !cs.hasResponse { cs.Error = err cs.Done() @@ -302,7 +303,7 @@ func (c *client) rpcReader(readerDone chan<- error) { }() for { - crn, err := c.ch.GetCodec().NewResponseOrNotify(c.conn) + msg, err := c.ch.GetCodec().NewMessage(c.conn) if nil != err { if err == io.ErrUnexpectedEOF || err == io.EOF { logging.Logger().Info("Client: disconnected from server") @@ -313,11 +314,14 @@ func (c *client) rpcReader(readerDone chan<- error) { continue } - if crn.IsResponse() { - err = c.responseHandle(crn.GetResponse()) - } else { - err = c.notifyHandle(crn.GetNotify()) + switch msg.MessageType() { + case protocol.MessageTypeResponse: + c.handleResponse(msg) + case protocol.MessageTypeNotification: + c.handleNotification(msg) + default: } + if nil != err { logging.Logger().Error(err.Error()) continue @@ -326,9 +330,16 @@ func (c *client) rpcReader(readerDone chan<- error) { } -func (c *client) responseHandle(codecResponse protocol.ClientCodecResponse) error { +func (c *client) handleResponse(msg protocol.ClientMessageCodec) error { + codec, err := msg.MessageCodec() + if nil != err { + return err + } + + resCodec := codec.(protocol.ClientResponseCodec) + c.pendingRequestsLock.Lock() - id := reflect.ValueOf(codecResponse.ID()).Convert(uint64Type).Uint() + id := reflect.ValueOf(resCodec.ID()).Convert(uint64Type).Uint() cs, ok := c.pendingRequests[id] if ok { @@ -337,15 +348,15 @@ func (c *client) responseHandle(codecResponse protocol.ClientCodecResponse) erro c.pendingRequestsLock.Unlock() if !ok { - return fmt.Errorf("Client: Unexpected ID=[%v] obtained from server", codecResponse.ID()) + return fmt.Errorf("Client: Unexpected ID=[%v] obtained from server", resCodec.ID()) } atomic.AddUint32(&c.pendingRequestsCount, ^uint32(0)) - if err := codecResponse.Result(cs.Result); nil != err { + if err := resCodec.Result(cs.Result); nil != err { log.Printf("responseHandle:%v", err) } - if err := codecResponse.Error(); nil != err { + if err := resCodec.Error(); nil != err { log.Printf("responseHandle:%v", err) // cs.Error = &ClientError{ // Server: true, @@ -358,8 +369,15 @@ func (c *client) responseHandle(codecResponse protocol.ClientCodecResponse) erro return nil } -func (c *client) notifyHandle(codecNotify protocol.ClientCodecNotify) error { - _, err := c.ch.GetRPCRegistry().Invoke(codecNotify) +func (c *client) handleNotification(msg protocol.ClientMessageCodec) error { + codec, err := msg.MessageCodec() + if nil != err { + return err + } + + notiCodec := codec.(protocol.ClientNotificationCodec) + + _, err = c.ch.GetRPCRegistry().Invoke(notiCodec) return err } diff --git a/client/call.go b/client/request_state.go similarity index 59% rename from client/call.go rename to client/request_state.go index f20247b..b991601 100644 --- a/client/call.go +++ b/client/request_state.go @@ -6,24 +6,23 @@ import ( "time" ) -var callStatePool sync.Pool var zeroTime time.Time -type CallState struct { +type RequestState struct { ID uint64 Method string Args interface{} Result interface{} Error error - DoneChan chan *CallState + DoneChan chan *RequestState hasResponse bool canceled uint32 } -func (cs *CallState) Done() { +func (rs *RequestState) Done() { select { - case cs.DoneChan <- cs: + case rs.DoneChan <- rs: // ok default: // We don't want to block here. It is the caller's responsibility to make @@ -42,28 +41,30 @@ func (cs *CallState) Done() { // // It is safe calling this function multiple times from concurrently // running goroutines. -func (cs *CallState) Cancel() { - atomic.StoreUint32(&cs.canceled, 1) +func (rs *RequestState) Cancel() { + atomic.StoreUint32(&rs.canceled, 1) } -func (cs *CallState) IsCanceled() bool { - return atomic.LoadUint32(&cs.canceled) != 0 +func (rs *RequestState) IsCanceled() bool { + return atomic.LoadUint32(&rs.canceled) != 0 } -func RetainCallState() *CallState { - v := callStatePool.Get() +var requestStatePool sync.Pool + +func retainRequestState() *RequestState { + v := requestStatePool.Get() if v == nil { - return &CallState{} + return &RequestState{} } - return v.(*CallState) + return v.(*RequestState) } -func ReleaseCallState(cs *CallState) { - cs.Method = "" - cs.Args = nil - cs.Result = nil - cs.Error = nil - cs.DoneChan = nil +func releaseCallState(rs *RequestState) { + rs.Method = "" + rs.Args = nil + rs.Result = nil + rs.Error = nil + rs.DoneChan = nil - callStatePool.Put(cs) + requestStatePool.Put(rs) } diff --git a/codec/codec.go b/codec/codec.go new file mode 100644 index 0000000..52643d0 --- /dev/null +++ b/codec/codec.go @@ -0,0 +1,23 @@ +package codec + +import "io" + +// Codec interface contains the encoder for response or decoder for request. +// Eg. gzip, flate compressions. +type Codec interface { + Encode(w io.Writer) io.Writer + Decode(r io.Reader) io.Reader +} + +type codec struct { +} + +func (_ *codec) Encode(w io.Writer) io.Writer { + return w +} + +func (_ *codec) Decode(r io.Reader) io.Reader { + return r +} + +var DefaultCodec = &codec{} diff --git a/codec/selector.go b/codec/selector.go new file mode 100644 index 0000000..902117f --- /dev/null +++ b/codec/selector.go @@ -0,0 +1,25 @@ +package codec + +import "io" + +// CodecSelector interface provides a way to select encoder using the http +// request. Typically people can use this to check HEADER of the request and +// figure out client capabilities. +// Eg. "Accept-Encoding" tells about supported compressions. +type CodecSelector interface { + SelectByReader(r io.Reader) Codec + SelectByWriter(w io.Writer) Codec +} + +type codecSelector struct { +} + +func (_ *codecSelector) SelectByReader(_ io.Reader) Codec { + return DefaultCodec +} + +func (_ *codecSelector) SelectByWriter(_ io.Writer) Codec { + return DefaultCodec +} + +var DefaultCodecSelector = &codecSelector{} diff --git a/constants.go b/constants.go new file mode 100644 index 0000000..d38302b --- /dev/null +++ b/constants.go @@ -0,0 +1,5 @@ +package rpc + +const ( + DefaultPendingMessages = 32 * 1024 +) diff --git a/encode/encode.go b/encode/encode.go deleted file mode 100644 index 544c99d..0000000 --- a/encode/encode.go +++ /dev/null @@ -1,23 +0,0 @@ -package encode - -import "io" - -// Encoder interface contains the encoder for response. -// Eg. gzip, flate compressions. -type Encoder interface { - Encode(w io.Writer) io.Writer - Decode(r io.Reader) io.Reader -} - -type encoder struct { -} - -func (_ *encoder) Encode(w io.Writer) io.Writer { - return w -} - -func (_ *encoder) Decode(r io.Reader) io.Reader { - return r -} - -var DefaultEncoder = &encoder{} diff --git a/encode/selector.go b/encode/selector.go deleted file mode 100644 index 30f6c56..0000000 --- a/encode/selector.go +++ /dev/null @@ -1,25 +0,0 @@ -package encode - -import "io" - -// EncoderSelector interface provides a way to select encoder using the http -// request. Typically people can use this to check HEADER of the request and -// figure out client capabilities. -// Eg. "Accept-Encoding" tells about supported compressions. -type EncoderSelector interface { - SelectByReader(r io.Reader) Encoder - SelectByWriter(w io.Writer) Encoder -} - -type encoderSelector struct { -} - -func (_ *encoderSelector) SelectByReader(_ io.Reader) Encoder { - return DefaultEncoder -} - -func (_ *encoderSelector) SelectByWriter(_ io.Writer) Encoder { - return DefaultEncoder -} - -var DefaultEncoderSelector = &encoderSelector{} diff --git a/gateway/handle.go b/gateway/handle.go deleted file mode 100644 index ca2d4ae..0000000 --- a/gateway/handle.go +++ /dev/null @@ -1,19 +0,0 @@ -package gateway - -import ( - "context" - "io" - - "git.loafle.net/commons_go/rpc" - "git.loafle.net/commons_go/rpc/protocol" -) - -func Handle(ctx context.Context, sh ServerHandler, codec protocol.ServerCodec, r io.Reader, w io.Writer) error { - return rpc.Handle(sh, codec, r, w, func(codecReq protocol.ServerCodecRequest) (result interface{}, err error) { - var params []string - if params, err = codecReq.Params(); nil != err { - return nil, err - } - return sh.Invoke(ctx, codecReq.Method(), params) - }) -} diff --git a/gateway/server_handler.go b/gateway/server_handler.go deleted file mode 100644 index 1bf118f..0000000 --- a/gateway/server_handler.go +++ /dev/null @@ -1,13 +0,0 @@ -package gateway - -import ( - "context" - - "git.loafle.net/commons_go/rpc" -) - -type ServerHandler interface { - rpc.ServerHandler - - Invoke(ctx context.Context, method string, params []string) (result interface{}, err error) -} diff --git a/gateway/server_handlers.go b/gateway/server_handlers.go deleted file mode 100644 index fd8b9a0..0000000 --- a/gateway/server_handlers.go +++ /dev/null @@ -1,21 +0,0 @@ -package gateway - -import ( - "context" - "errors" - - "git.loafle.net/commons_go/rpc" -) - -type ServerHandlers struct { - rpc.ServerHandlers -} - -func (sh *ServerHandlers) Invoke(ctx context.Context, method string, params []string) (result interface{}, err error) { - - return nil, errors.New("Server: Handler method[Invoke] of Server is not implement") -} - -func (sh *ServerHandlers) Validate() { - sh.ServerHandlers.Validate() -} diff --git a/gateway/servlet_handler.go b/gateway/servlet_handler.go new file mode 100644 index 0000000..daf4804 --- /dev/null +++ b/gateway/servlet_handler.go @@ -0,0 +1,8 @@ +package gateway + + +import "git.loafle.net/commons_go/rpc" + +type ServletHandler interface { + rpc.ServletHandler +} diff --git a/gateway/servlet_handlers.go b/gateway/servlet_handlers.go new file mode 100644 index 0000000..6654cea --- /dev/null +++ b/gateway/servlet_handlers.go @@ -0,0 +1,9 @@ +package gateway + +import ( + "git.loafle.net/commons_go/rpc" +) + +type ServletHandlers struct { + rpc.ServletHandlers +} diff --git a/handle.go b/handle.go deleted file mode 100644 index d877b9e..0000000 --- a/handle.go +++ /dev/null @@ -1,39 +0,0 @@ -package rpc - -import ( - "io" - - "git.loafle.net/commons_go/rpc/protocol" -) - -type Invoker func(codecReq protocol.ServerCodecRequest) (result interface{}, err error) - -func Handle(sh ServerHandler, codec protocol.ServerCodec, r io.Reader, w io.Writer, invoker Invoker) error { - sh.OnPreRead(r) - // Create a new codec request. - codecReq, errNew := codec.NewRequest(r) - defer func() { - if nil != codecReq { - codecReq.Complete() - } - }() - - if nil != errNew { - return errNew - } - sh.OnPostRead(r) - - result, err := invoker(codecReq) - - if nil != err { - sh.OnPreWriteError(w, err) - codecReq.WriteError(w, 400, err) - sh.OnPostWriteError(w, err) - } else { - sh.OnPreWriteResult(w, result) - codecReq.WriteResponse(w, result) - sh.OnPostWriteResult(w, result) - } - - return nil -} diff --git a/notify/constants.go b/notify/constants.go deleted file mode 100644 index 1682d20..0000000 --- a/notify/constants.go +++ /dev/null @@ -1,7 +0,0 @@ -package notify - -const ( - // DefaultPendingNotifies is the default number of pending messages - // handled by Client and Server. - DefaultPendingNotifies = 1024 -) diff --git a/notify/notify.go b/notify/notify.go deleted file mode 100644 index 132027f..0000000 --- a/notify/notify.go +++ /dev/null @@ -1,163 +0,0 @@ -package notify - -import ( - "fmt" - "log" - "net" - "runtime" - "sync" - - "git.loafle.net/commons_go/rpc/client" -) - -func New(nh NotifyHandler) Notifier { - n := ¬ify{ - nh: nh, - } - return n -} - -type Notifier interface { - Start(conn net.Conn) - Close() - - Notify(method string, args ...interface{}) (err error) -} - -type notify struct { - nh NotifyHandler - - conn net.Conn - - notifyQueueChan chan *client.CallState - - stopChan chan struct{} - stopWg sync.WaitGroup -} - -func (n *notify) Start(conn net.Conn) { - n.nh.Validate() - - if n.stopChan != nil { - panic("RPC Notify: the given notify is already started. Call Notifier.Stop() before calling Notifier.Start(conn) again!") - } - - n.conn = conn - - n.stopChan = make(chan struct{}) - n.notifyQueueChan = make(chan *client.CallState, n.nh.GetPendingNotifies()) - - go n.handleRPC() -} - -func (n *notify) Close() { - if n.stopChan == nil { - panic("RPC Notify: the notify must be started before stopping it") - } - close(n.stopChan) - n.stopWg.Wait() - n.stopChan = nil -} - -func (n *notify) Notify(method string, args ...interface{}) (err error) { - var cs *client.CallState - if cs, err = n.send(method, args...); nil != err { - return - } - - select { - case <-cs.DoneChan: - err = cs.Error - client.ReleaseCallState(cs) - } - - return -} - -func (n *notify) send(method string, args ...interface{}) (cs *client.CallState, err error) { - cs = client.RetainCallState() - - cs.Method = method - cs.Args = args - cs.DoneChan = make(chan *client.CallState, 1) - - select { - case n.notifyQueueChan <- cs: - return cs, nil - default: - client.ReleaseCallState(cs) - return nil, getClientOverflowError(n) - } -} - -func (n *notify) handleRPC() { - subStopChan := make(chan struct{}) - - writerDone := make(chan error, 1) - go n.rpcWriter(subStopChan, writerDone) - - var err error - - select { - case err = <-writerDone: - close(subStopChan) - case <-n.stopChan: - close(subStopChan) - <-writerDone - } - - if err != nil { - //n.LogError("%s", err) - log.Printf("handleRPC: %v", err) - err = &client.ClientError{ - Connection: true, - Err: err, - } - } -} - -func (n *notify) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) { - var err error - defer func() { - writerDone <- err - }() - - for { - var cs *client.CallState - - select { - case cs = <-n.notifyQueueChan: - default: - // Give the last chance for ready goroutines filling n.requestsChan :) - runtime.Gosched() - - select { - case <-stopChan: - return - case cs = <-n.notifyQueueChan: - } - } - - if cs.IsCanceled() { - client.ReleaseCallState(cs) - continue - } - - err = n.nh.GetCodec().Write(n.conn, cs.Method, cs.Args, nil) - cs.Error = err - cs.Done() - if nil != err { - err = fmt.Errorf("RPC Notify: Cannot send notify to wire: [%s]", err) - return - } - } -} - -func getClientOverflowError(n *notify) error { - err := fmt.Errorf("RPC Notify: Notifies' queue with size=%d is overflown. Try increasing NotifyHandler.PendingNotifies value", cap(n.notifyQueueChan)) - //c.LogError("%s", err) - return &client.ClientError{ - Overflow: true, - Err: err, - } -} diff --git a/notify/notify_handler.go b/notify/notify_handler.go deleted file mode 100644 index cf1db4c..0000000 --- a/notify/notify_handler.go +++ /dev/null @@ -1,12 +0,0 @@ -package notify - -import ( - "git.loafle.net/commons_go/rpc/protocol" -) - -type NotifyHandler interface { - GetCodec() protocol.ClientCodec - GetPendingNotifies() int - - Validate() -} diff --git a/notify/notify_handlers.go b/notify/notify_handlers.go deleted file mode 100644 index 8fd1ffd..0000000 --- a/notify/notify_handlers.go +++ /dev/null @@ -1,32 +0,0 @@ -package notify - -import ( - "git.loafle.net/commons_go/rpc/protocol" -) - -type NotifyHandlers struct { - Codec protocol.ClientCodec - - // The maximum number of pending requests in the queue. - // - // The number of pending requsts should exceed the expected number - // of concurrent goroutines calling client's methods. - // Otherwise a lot of ClientError.Overflow errors may appear. - // - // Default is DefaultPendingNotifies. - PendingNotifies int -} - -func (nh *NotifyHandlers) GetCodec() protocol.ClientCodec { - return nh.Codec -} - -func (nh *NotifyHandlers) GetPendingNotifies() int { - return nh.PendingNotifies -} - -func (nh *NotifyHandlers) Validate() { - if nh.PendingNotifies <= 0 { - nh.PendingNotifies = DefaultPendingNotifies - } -} diff --git a/protocol/client_codec.go b/protocol/client_codec.go index e8d4f1a..c77b25b 100644 --- a/protocol/client_codec.go +++ b/protocol/client_codec.go @@ -4,29 +4,42 @@ import ( "io" ) +type MessageType int + +const ( + MessageTypeUnknown MessageType = iota + MessageTypeRequest + MessageTypeResponse + MessageTypeNotification +) + // ClientCodec creates a ClientCodecRequest to process each request. type ClientCodec interface { - Write(w io.Writer, method string, args interface{}, id interface{}) error - NewResponseOrNotify(rc io.Reader) (ClientCodecResponseOrNotify, error) + WriteRequest(w io.Writer, method string, args interface{}, id interface{}) error + NewMessage(rc io.Reader) (ClientMessageCodec, error) } -// ClientCodecResponseOrNotify encodes a response or notify using a specific -// serialization scheme. -type ClientCodecResponseOrNotify interface { - IsResponse() bool - IsNotify() bool - GetResponse() ClientCodecResponse - GetNotify() ClientCodecNotify - Complete() -} - -type ClientCodecResponse interface { - ID() interface{} - Result(result interface{}) error - Error() error - Complete() -} - -type ClientCodecNotify interface { +type ClientRequestCodec interface { + RegistryCodec +} + +// ClientMessageCodec decodes a response or notification using a specific +// serialization scheme. +type ClientMessageCodec interface { + MessageType() MessageType + // Reads the message filling the RPC response or notification. + MessageCodec() (interface{}, error) + + Close() +} + +type ClientResponseCodec interface { + Result(result interface{}) error + Error() error + ID() interface{} + Close() +} + +type ClientNotificationCodec interface { RegistryCodec } diff --git a/protocol/json/client.go b/protocol/json/client.go index 0a0adea..5fa8d82 100644 --- a/protocol/json/client.go +++ b/protocol/json/client.go @@ -6,48 +6,15 @@ import ( "io" "sync" - "git.loafle.net/commons_go/rpc/encode" + "git.loafle.net/commons_go/rpc/codec" "git.loafle.net/commons_go/rpc/protocol" ) -// ---------------------------------------------------------------------------- -// Request and Response -// ---------------------------------------------------------------------------- - -// clientRequest represents a JSON-RPC request sent by a client. -type clientRequest struct { - // JSON-RPC protocol. - Version string `json:"jsonrpc"` - - // A String containing the name of the method to be invoked. - Method string `json:"method"` - - // Object to pass as request parameter to the method. - Params interface{} `json:"params"` - - // The request id. This can be of any type. It is used to match the - // response with the request that it is replying to. - ID interface{} `json:"id"` -} - -// clientResponse represents a JSON-RPC response returned to a client. -type clientResponse struct { - Version string `json:"jsonrpc"` - Result *json.RawMessage `json:"result,omitempty"` - Error error `json:"error,omitempty"` - ID interface{} `json:"id,omitempty"` -} - -// clientRequest represents a JSON-RPC request sent by a client. -type clientNotify struct { - // JSON-RPC protocol. - Version string `json:"jsonrpc"` - - // A String containing the name of the method to be invoked. - Method string `json:"method"` - - // Object to pass as request parameter to the method. - Params *json.RawMessage `json:"params,omitempty"` +// clientMessage represents a JSON-RPC message sent to a client. +type clientMessage struct { + Version string `json:"jsonrpc"` + MessageType protocol.MessageType `json:"messageType"` + Message *json.RawMessage `json:"message"` } // ---------------------------------------------------------------------------- @@ -55,29 +22,27 @@ type clientNotify struct { // ---------------------------------------------------------------------------- // NewCustomClientCodec returns a new JSON Codec based on passed encoder selector. -func NewCustomClientCodec(encSel encode.EncoderSelector) *ClientCodec { - return &ClientCodec{encSel: encSel} +func NewCustomClientCodec(codecSel codec.CodecSelector) protocol.ClientCodec { + return &ClientCodec{codecSel: codecSel} } // NewClientCodec returns a new JSON Codec. -func NewClientCodec() *ClientCodec { - return NewCustomClientCodec(encode.DefaultEncoderSelector) +func NewClientCodec() protocol.ClientCodec { + return NewCustomClientCodec(codec.DefaultCodecSelector) } // ClientCodec creates a ClientCodecRequest to process each request. type ClientCodec struct { - encSel encode.EncoderSelector + codecSel codec.CodecSelector } -func (cc *ClientCodec) Write(w io.Writer, method string, args interface{}, id interface{}) error { +func (cc *ClientCodec) WriteRequest(w io.Writer, method string, args interface{}, id interface{}) error { req := retainClientRequest(method, args, id) defer func() { - if nil != req { - releaseClientRequest(req) - } + releaseClientRequest(req) }() - encoder := json.NewEncoder(cc.encSel.SelectByWriter(w).Encode(w)) + encoder := json.NewEncoder(cc.codecSel.SelectByWriter(w).Encode(w)) if err := encoder.Encode(req); nil != err { return err } @@ -85,126 +50,97 @@ func (cc *ClientCodec) Write(w io.Writer, method string, args interface{}, id in return nil } -// NewResponse returns a ClientCodecResponse. -func (cc *ClientCodec) NewResponseOrNotify(r io.Reader) (protocol.ClientCodecResponseOrNotify, error) { - return newClientCodecResponseOrNotify(r, cc.encSel.SelectByReader(r)) +// NewMessage returns a ClientMessageCodec. +func (cc *ClientCodec) NewMessage(r io.Reader) (protocol.ClientMessageCodec, error) { + return newClientMessageCodec(r, cc.codecSel.SelectByReader(r)) } -// newCodecRequest returns a new ServerCodecRequest. -func newClientCodecResponseOrNotify(r io.Reader, encoder encode.Encoder) (protocol.ClientCodecResponseOrNotify, error) { - // Decode the request body and check if RPC method is valid. - var raw json.RawMessage - dec := json.NewDecoder(r) - err := dec.Decode(&raw) - - if err == io.ErrUnexpectedEOF || err == io.EOF { - return nil, err - } +// newClientMessageCodec returns a new ClientMessageCodec. +func newClientMessageCodec(r io.Reader, codec codec.Codec) (protocol.ClientMessageCodec, error) { + msg := retainClientMessage() + err := json.NewDecoder(r).Decode(msg) if err != nil { + if err == io.ErrUnexpectedEOF || err == io.EOF { + return nil, err + } err = &Error{ Code: E_PARSE, Message: err.Error(), - Data: raw, + Data: msg, + } + } + if msg.Version != Version { + err = &Error{ + Code: E_INVALID_REQ, + Message: "jsonrpc must be " + Version, + Data: msg, } } - ccrn := retainClientCodecResponseOrNotify() + return retainClientMessageCodec(msg, err, codec), nil +} - if notify, err := newClientCodecNotify(raw); nil != err { - res, err := newClientCodecResponse(raw) - if nil != err { - releaseClientCodecResponseOrNotify(ccrn) - return nil, fmt.Errorf("Is not response or notification [%v]", raw) - } - ccrn.response = res - } else { - ccrn.notify = notify +type ClientMessageCodec struct { + msg *clientMessage + err error + codec codec.Codec +} + +func (ccm *ClientMessageCodec) MessageType() MessageType { + return ccm.msg.MessageType +} + +func (ccm *ClientMessageCodec) MessageCodec() (interface{}, error) { + switch ccm.msg.MessageType { + case protocol.MessageTypeResponse: + return newClientResponseCodec(ccm.msg.Message, ccm.codec) + case protocol.MessageTypeNotification: + return newClientNotificationCodec(ccm.msg.Message, ccm.codec) + } + return nil, fmt.Errorf("RPC: Not supported message type[%v]", ccm.msg.MessageType) +} + +func (ccm *ClientMessageCodec) Close() { + if nil != ccm.msg { + releaseClientMessage(ccm.msg) } - // if res, err := newClientCodecResponse(raw); nil != err { - // notify, err := newClientCodecNotify(raw) - // if nil != err { - // releaseClientCodecResponseOrNotify(ccrn) - // return nil, fmt.Errorf("Is not response or notification [%v]", raw) - // } - // ccrn.notify = notify - // } else { - // ccrn.response = res - // } - - return ccrn, nil + releaseClientMessageCodec(ccm) } -type ClientCodecResponseOrNotify struct { - notify protocol.ClientCodecNotify - response protocol.ClientCodecResponse -} +var clientMessagePool sync.Pool -func (ccrn *ClientCodecResponseOrNotify) IsResponse() bool { - return nil != ccrn.response -} - -func (ccrn *ClientCodecResponseOrNotify) IsNotify() bool { - return nil != ccrn.notify -} - -func (ccrn *ClientCodecResponseOrNotify) GetResponse() protocol.ClientCodecResponse { - return ccrn.response -} - -func (ccrn *ClientCodecResponseOrNotify) GetNotify() protocol.ClientCodecNotify { - return ccrn.notify -} - -func (ccrn *ClientCodecResponseOrNotify) Complete() { - if nil != ccrn.notify { - ccrn.notify.Complete() - } - if nil != ccrn.response { - ccrn.response.Complete() - } - releaseClientCodecResponseOrNotify(ccrn) -} - -var clientRequestPool sync.Pool - -func retainClientRequest(method string, params interface{}, id interface{}) *clientRequest { - var cr *clientRequest - v := clientRequestPool.Get() +func retainClientMessage() *clientMessage { + v := clientMessagePool.Get() if v == nil { - cr = &clientRequest{} - } else { - cr = v.(*clientRequest) + return &clientMessage{} } - - cr.Version = Version - cr.Method = method - cr.Params = params - cr.ID = id - - return cr + return v.(*clientMessage) } -func releaseClientRequest(cr *clientRequest) { - cr.Version = "" - cr.Method = "" - cr.Params = nil - cr.ID = nil - - clientRequestPool.Put(cr) +func releaseClientMessage(cm *clientMessage) { + clientMessagePool.Put(cm) } -var clientCodecResponseOrNotifyPool sync.Pool +var clientMessageCodecPool sync.Pool -func retainClientCodecResponseOrNotify() *ClientCodecResponseOrNotify { - v := clientCodecResponseOrNotifyPool.Get() +func retainClientMessageCodec(msg *clientMessage, err error, codec codec.Codec) *ClientMessageCodec { + var ccm *ClientMessageCodec + v := clientMessageCodecPool.Get() if v == nil { - return &ClientCodecResponseOrNotify{} + ccm = &ClientMessageCodec{} + } else { + ccm = v.(*ClientMessageCodec) } - return v.(*ClientCodecResponseOrNotify) + + ccm.msg = msg + ccm.err = err + ccm.codec = codec + + return ccm } -func releaseClientCodecResponseOrNotify(cr *ClientCodecResponseOrNotify) { +func releaseClientMessageCodec(cr *ClientMessageCodec) { - clientCodecResponseOrNotifyPool.Put(cr) + clientMessageCodecPool.Put(cr) } diff --git a/protocol/json/client_notification.go b/protocol/json/client_notification.go new file mode 100644 index 0000000..9314c25 --- /dev/null +++ b/protocol/json/client_notification.go @@ -0,0 +1,97 @@ +package json + +import ( + "encoding/json" + "sync" + + "git.loafle.net/commons_go/rpc/codec" + "git.loafle.net/commons_go/rpc/protocol" +) + +// ---------------------------------------------------------------------------- +// ClientNotificationCodec +// ---------------------------------------------------------------------------- +// clientRequest represents a JSON-RPC notification sent to a client. +type clientNotification struct { + // A String containing the name of the method to be invoked. + Method string `json:"method"` + // Object to pass as request parameter to the method. + Params *json.RawMessage `json:"params,omitempty"` +} + +// newClientNotificationCodec returns a new ClientNotificationCodec. +func newClientNotificationCodec(raw *json.RawMessage, codec codec.Codec) (protocol.ClientNotificationCodec, error) { + // Decode the request body and check if RPC method is valid. + cnc := retainClientNotificationCodec() + + if err := json.Unmarshal(*raw, &cnc.notification); nil != err { + releaseClientNotificationCodec(cnc) + return nil, err + } + + return cnc, nil +} + +// ClientNotificationCodec decodes and encodes a single notification. +type ClientNotificationCodec struct { + notification clientNotification + err error +} + +func (crc *ClientNotificationCodec) Method() string { + return crc.notification.Method +} + +func (crc *ClientNotificationCodec) ReadParams(args []interface{}) error { + if crc.err == nil && crc.notification.Params != nil { + // Note: if scr.request.Params is nil it's not an error, it's an optional member. + // JSON params structured object. Unmarshal to the args object. + if err := json.Unmarshal(*crc.notification.Params, &args); err != nil { + crc.err = &Error{ + Code: E_INVALID_REQ, + Message: err.Error(), + Data: crc.notification.Params, + } + } + } + return crc.err +} + +func (crc *ClientNotificationCodec) Params() ([]string, error) { + if crc.err == nil && crc.notification.Params != nil { + var results []string + + if err := json.Unmarshal(*crc.notification.Params, &results); err != nil { + crc.err = &Error{ + Code: E_INVALID_REQ, + Message: err.Error(), + Data: crc.notification.Params, + } + return nil, crc.err + } + + return results, nil + } + return nil, crc.err +} + +func (crc *ClientNotificationCodec) Close() { + releaseClientNotificationCodec(crc) +} + +var clientNotificationCodecPool sync.Pool + +func retainClientNotificationCodec() *ClientNotificationCodec { + v := clientNotificationCodecPool.Get() + if v == nil { + return &ClientNotificationCodec{} + } + return v.(*ClientNotificationCodec) +} + +func releaseClientNotificationCodec(crc *ClientNotificationCodec) { + crc.notification.Method = "" + crc.notification.Params = nil + + clientNotificationCodecPool.Put(crc) +} diff --git a/protocol/json/client_notify.go b/protocol/json/client_notify.go deleted file mode 100644 index 783227b..0000000 --- a/protocol/json/client_notify.go +++ /dev/null @@ -1,103 +0,0 @@ -package json - -import ( - "encoding/json" - "fmt" - "sync" - - "git.loafle.net/commons_go/rpc/protocol" -) - -// ---------------------------------------------------------------------------- -// ClientCodecNotify -// ---------------------------------------------------------------------------- - -// newCodecRequest returns a new ClientCodecNotify. -func newClientCodecNotify(raw json.RawMessage) (protocol.ClientCodecNotify, error) { - // Decode the request body and check if RPC method is valid. - ccn := retainClientCodecNotify() - err := json.Unmarshal(raw, &ccn.notify) - if err != nil { - releaseClientCodecNotify(ccn) - return nil, err - } - if "" == ccn.notify.Method { - releaseClientCodecNotify(ccn) - return nil, fmt.Errorf("This is not Client Notify") - } - - if ccn.notify.Version != Version { - ccn.err = &Error{ - Code: E_INVALID_REQ, - Message: "jsonrpc must be " + Version, - Data: ccn.notify, - } - } - - return ccn, nil -} - -// ClientCodecNotify decodes and encodes a single notification. -type ClientCodecNotify struct { - notify clientNotify - err error -} - -func (ccn *ClientCodecNotify) Method() string { - return ccn.notify.Method -} - -func (ccn *ClientCodecNotify) ReadParams(args []interface{}) error { - if ccn.err == nil && ccn.notify.Params != nil { - // Note: if scr.request.Params is nil it's not an error, it's an optional member. - // JSON params structured object. Unmarshal to the args object. - if err := json.Unmarshal(*ccn.notify.Params, &args); err != nil { - ccn.err = &Error{ - Code: E_INVALID_REQ, - Message: err.Error(), - Data: ccn.notify.Params, - } - } - } - return ccn.err -} - -func (ccn *ClientCodecNotify) Params() ([]string, error) { - if ccn.err == nil && ccn.notify.Params != nil { - var results []string - - if err := json.Unmarshal(*ccn.notify.Params, &results); err != nil { - ccn.err = &Error{ - Code: E_INVALID_REQ, - Message: err.Error(), - Data: ccn.notify.Params, - } - return nil, ccn.err - } - - return results, nil - } - return nil, ccn.err -} - -func (ccn *ClientCodecNotify) Complete() { - releaseClientCodecNotify(ccn) -} - -var clientCodecNotifyPool sync.Pool - -func retainClientCodecNotify() *ClientCodecNotify { - v := clientCodecNotifyPool.Get() - if v == nil { - return &ClientCodecNotify{} - } - return v.(*ClientCodecNotify) -} - -func releaseClientCodecNotify(ccn *ClientCodecNotify) { - ccn.notify.Version = "" - ccn.notify.Method = "" - ccn.notify.Params = nil - - clientCodecNotifyPool.Put(ccn) -} diff --git a/protocol/json/client_request.go b/protocol/json/client_request.go new file mode 100644 index 0000000..967954f --- /dev/null +++ b/protocol/json/client_request.go @@ -0,0 +1,51 @@ +package json + +import "sync" + +// ---------------------------------------------------------------------------- +// Request and Response +// ---------------------------------------------------------------------------- + +// clientRequest represents a JSON-RPC request sent by a client. +type clientRequest struct { + // JSON-RPC protocol. + Version string `json:"jsonrpc"` + + // A String containing the name of the method to be invoked. + Method string `json:"method"` + + // Object to pass as request parameter to the method. + Params interface{} `json:"params"` + + // The request id. This can be of any type. It is used to match the + // response with the request that it is replying to. + ID interface{} `json:"id"` +} + +var clientRequestPool sync.Pool + +func retainClientRequest(method string, params interface{}, id interface{}) *clientRequest { + var cr *clientRequest + v := clientRequestPool.Get() + if v == nil { + cr = &clientRequest{} + } else { + cr = v.(*clientRequest) + } + + cr.Version = Version + cr.Method = method + cr.Params = params + cr.ID = id + + return cr +} + +func releaseClientRequest(cr *clientRequest) { + cr.Version = "" + cr.Method = "" + cr.Params = nil + cr.ID = nil + + clientRequestPool.Put(cr) +} diff --git a/protocol/json/client_response.go b/protocol/json/client_response.go index 39db892..e0c003c 100644 --- a/protocol/json/client_response.go +++ b/protocol/json/client_response.go @@ -5,53 +5,53 @@ import ( "fmt" "sync" + "git.loafle.net/commons_go/rpc/codec" "git.loafle.net/commons_go/rpc/protocol" ) // ---------------------------------------------------------------------------- -// ClientCodecResponse +// ClientResponseCodec // ---------------------------------------------------------------------------- +// clientResponse represents a JSON-RPC response returned to a client. +type clientResponse struct { + Result *json.RawMessage `json:"result,omitempty"` + Error error `json:"error,omitempty"` + ID interface{} `json:"id,omitempty"` +} -// newClientCodecResponse returns a new ClientCodecResponse. -func newClientCodecResponse(raw json.RawMessage) (protocol.ClientCodecResponse, error) { +// newClientResponseCodec returns a new ClientResponseCodec. +func newClientResponseCodec(raw *json.RawMessage, codec codec.Codec) (protocol.ClientResponseCodec, error) { // Decode the request body and check if RPC method is valid. - ccr := retainClientCodecResponse() - err := json.Unmarshal(raw, &ccr.response) + ccr := retainClientResponseCodec() + + err := json.Unmarshal(*raw, &ccr.res) if err != nil { - releaseClientCodecResponse(ccr) + releaseClientResponseCodec(ccr) return nil, err } - if nil == ccr.response.Result && nil == ccr.response.Error { - releaseClientCodecResponse(ccr) + if nil == ccr.res.Result && nil == ccr.res.Error { + releaseClientResponseCodec(ccr) return nil, fmt.Errorf("This is not Response") } - if ccr.response.Version != Version { - ccr.err = &Error{ - Code: E_INVALID_REQ, - Message: "jsonrpc must be " + Version, - Data: ccr.response, - } - } - return ccr, nil } -// ClientCodecResponse decodes and encodes a single request. -type ClientCodecResponse struct { - response clientResponse - err error +// ClientResponseCodec decodes and encodes a single request. +type ClientResponseCodec struct { + res clientResponse + err error } -func (ccr *ClientCodecResponse) ID() interface{} { - return ccr.response.ID +func (ccr *ClientResponseCodec) ID() interface{} { + return ccr.res.ID } -func (ccr *ClientCodecResponse) Result(result interface{}) error { - if ccr.err == nil && ccr.response.Result != nil { - if err := json.Unmarshal(*ccr.response.Result, &result); err != nil { +func (ccr *ClientResponseCodec) Result(result interface{}) error { + if ccr.err == nil && ccr.res.Result != nil { + if err := json.Unmarshal(*ccr.res.Result, &result); err != nil { params := [1]interface{}{result} - if err = json.Unmarshal(*ccr.response.Result, ¶ms); err != nil { + if err = json.Unmarshal(*ccr.res.Result, ¶ms); err != nil { ccr.err = err } } @@ -59,29 +59,28 @@ func (ccr *ClientCodecResponse) Result(result interface{}) error { return ccr.err } -func (ccr *ClientCodecResponse) Error() error { - return ccr.response.Error +func (ccr *ClientResponseCodec) Error() error { + return ccr.res.Error } -func (ccr *ClientCodecResponse) Complete() { - releaseClientCodecResponse(ccr) +func (ccr *ClientResponseCodec) Close() { + releaseClientResponseCodec(ccr) } -var clientCodecResponsePool sync.Pool +var clientResponseCodecPool sync.Pool -func retainClientCodecResponse() *ClientCodecResponse { - v := clientCodecResponsePool.Get() +func retainClientResponseCodec() *ClientResponseCodec { + v := clientResponseCodecPool.Get() if v == nil { - return &ClientCodecResponse{} + return &ClientResponseCodec{} } - return v.(*ClientCodecResponse) + return v.(*ClientResponseCodec) } -func releaseClientCodecResponse(ccr *ClientCodecResponse) { - ccr.response.Version = "" - ccr.response.Result = nil - ccr.response.Error = nil - ccr.response.ID = 0 +func releaseClientResponseCodec(ccr *ClientResponseCodec) { + ccr.res.Result = nil + ccr.res.Error = nil + ccr.res.ID = 0 - clientCodecResponsePool.Put(ccr) + clientResponseCodecPool.Put(ccr) } diff --git a/protocol/json/server.go b/protocol/json/server.go index d4719a1..837c6b3 100644 --- a/protocol/json/server.go +++ b/protocol/json/server.go @@ -5,50 +5,17 @@ import ( "io" "sync" - "git.loafle.net/commons_go/rpc/encode" + "git.loafle.net/commons_go/rpc/codec" "git.loafle.net/commons_go/rpc/protocol" ) var null = json.RawMessage([]byte("null")) -// ---------------------------------------------------------------------------- -// Request and Response -// ---------------------------------------------------------------------------- - -// serverRequest represents a JSON-RPC request received by the server. -type serverRequest struct { +type serverMessage struct { // JSON-RPC protocol. - Version string `json:"jsonrpc"` - - // A String containing the name of the method to be invoked. - Method string `json:"method"` - - // A Structured value to pass as arguments to the method. - Params *json.RawMessage `json:"params,omitempty"` - - // The request id. MUST be a string, number or null. - // Our implementation will not do type checking for id. - // It will be copied as it is. - ID *json.RawMessage `json:"id,omitempty"` -} - -// serverResponse represents a JSON-RPC response returned by the server. -type serverResponse struct { - // JSON-RPC protocol. - Version string `json:"jsonrpc"` - - // The Object that was returned by the invoked method. This must be null - // in case there was an error invoking the method. - // As per spec the member will be omitted if there was an error. - Result interface{} `json:"result,omitempty"` - - // An Error object if there was an error invoking the method. It must be - // null if there was no error. - // As per spec the member will be omitted if there was no error. - Error *Error `json:"error,omitempty"` - - // This must be the same id as the request it is responding to. - ID *json.RawMessage `json:"id,omitempty"` + Version string `json:"jsonrpc"` + MessageType protocol.MessageType `json:"messageType"` + Message interface{} `json:"message"` } // ---------------------------------------------------------------------------- @@ -56,279 +23,65 @@ type serverResponse struct { // ---------------------------------------------------------------------------- // NewCustomServerCodec returns a new JSON Codec based on passed encoder selector. -func NewCustomServerCodec(encSel encode.EncoderSelector) *ServerCodec { - return &ServerCodec{encSel: encSel} +func NewCustomServerCodec(codecSel codec.CodecSelector) protocol.ServerCodec { + return &ServerCodec{codecSel: codecSel} } // NewServerCodec returns a new JSON Codec. -func NewServerCodec() *ServerCodec { - return NewCustomServerCodec(encode.DefaultEncoderSelector) +func NewServerCodec() protocol.ServerCodec { + return NewCustomServerCodec(codec.DefaultCodecSelector) } -// ServerCodec creates a ServerCodecRequest to process each request. +// ServerCodec creates a ServerRequestCodec to process each request. type ServerCodec struct { - encSel encode.EncoderSelector - notifyMtx sync.Mutex - notify clientRequest + codecSel codec.CodecSelector } -// NewRequest returns a ServerCodecRequest. -func (sc *ServerCodec) NewRequest(r io.Reader) (protocol.ServerCodecRequest, error) { - return newServerCodecRequest(r, sc.encSel.SelectByReader(r)) +// NewRequest returns a ServerRequestCodec. +func (sc *ServerCodec) NewRequest(r io.Reader) (protocol.ServerRequestCodec, error) { + return newServerRequestCodec(r, sc.codecSel.SelectByReader(r)) } -// WriteNotify send a notification from server to client. -func (sc *ServerCodec) WriteNotify(w io.Writer, method string, args interface{}) error { - sc.notifyMtx.Lock() - - sc.notify.Version = Version - sc.notify.Method = method - sc.notify.Params = args - - encoder := json.NewEncoder(sc.encSel.SelectByWriter(w).Encode(w)) - err := encoder.Encode(&sc.notify) - sc.notifyMtx.Unlock() +// WriteNotification send a notification from server to client. +func (sc *ServerCodec) WriteNotification(w io.Writer, method string, args interface{}) error { + noti := retainServerNotification(method, args) + defer func() { + releaseServerNotification(noti) + }() + msg := retainServerMessage(protocol.MessageTypeNotification, noti) + defer func() { + releaseServerMessage(msg) + }() + encoder := json.NewEncoder(sc.codecSel.SelectByWriter(w).Encode(w)) // Not sure in which case will this happen. But seems harmless. - if err != nil { + if err := encoder.Encode(msg); nil != err { return err } return nil } -// ---------------------------------------------------------------------------- -// ServerCodecRequest -// ---------------------------------------------------------------------------- +var serverMessagePool sync.Pool -// newCodecRequest returns a new ServerCodecRequest. -func newServerCodecRequest(r io.Reader, encoder encode.Encoder) (protocol.ServerCodecRequest, error) { - // Decode the request body and check if RPC method is valid. - req := retainServerRequest() - err := json.NewDecoder(r).Decode(req) - if err == io.ErrUnexpectedEOF || err == io.EOF { - return nil, err - } - if err != nil { - err = &Error{ - Code: E_PARSE, - Message: err.Error(), - Data: req, - } - } - if req.Version != Version { - err = &Error{ - Code: E_INVALID_REQ, - Message: "jsonrpc must be " + Version, - Data: req, - } - } - - return retainServerCodecRequest(req, err, encoder), nil -} - -// CodecRequest decodes and encodes a single request. -type ServerCodecRequest struct { - request *serverRequest - err error - encoder encode.Encoder -} - -// Complete is callback function that end of request. -func (scr *ServerCodecRequest) Complete() { - if nil != scr.request { - releaseServerRequest(scr.request) - } - releaseServerCodecRequest(scr) -} - -// Method returns the RPC method for the current request. -// -// The method uses a dotted notation as in "Service.Method". -func (scr *ServerCodecRequest) Method() string { - return scr.request.Method -} - -// ReadRequest fills the request object for the RPC method. -// -// ReadRequest parses request parameters in two supported forms in -// accordance with http://www.jsonrpc.org/specification#parameter_structures -// -// by-position: params MUST be an Array, containing the -// values in the Server expected order. -// -// by-name: params MUST be an Object, with member names -// that match the Server expected parameter names. The -// absence of expected names MAY result in an error being -// generated. The names MUST match exactly, including -// case, to the method's expected parameters. -func (scr *ServerCodecRequest) ReadParams(args []interface{}) error { - if scr.err == nil && scr.request.Params != nil { - // Note: if scr.request.Params is nil it's not an error, it's an optional member. - // JSON params structured object. Unmarshal to the args object. - - // if err := json.Unmarshal(*scr.request.Params, &args); err != nil { - // // Clearly JSON params is not a structured object, - // // fallback and attempt an unmarshal with JSON params as - // // array value and RPC params is struct. Unmarshal into - // // array containing the request struct. - // scr.err = &Error{ - // Code: E_INVALID_REQ, - // Message: err.Error(), - // Data: scr.request.Params, - // } - // } - raws := make([]json.RawMessage, len(args)) - if err := json.Unmarshal(*scr.request.Params, &raws); err != nil { - scr.err = &Error{ - Code: E_INVALID_REQ, - Message: err.Error(), - Data: scr.request.Params, - } - return scr.err - } - - for indexI := 0; indexI < len(args); indexI++ { - raw := raws[indexI] - arg := args[indexI] - if err := json.Unmarshal(raw, &arg); err != nil { - scr.err = &Error{ - Code: E_INVALID_REQ, - Message: err.Error(), - Data: scr.request.Params, - } - return scr.err - } - } - } - return scr.err -} - -func (scr *ServerCodecRequest) Params() ([]string, error) { - if scr.err == nil && scr.request.Params != nil { - var results []string - - if err := json.Unmarshal(*scr.request.Params, &results); err != nil { - scr.err = &Error{ - Code: E_INVALID_REQ, - Message: err.Error(), - Data: scr.request.Params, - } - return nil, scr.err - } - - return results, nil - } - return nil, scr.err -} - -// WriteResponse encodes the response and writes it to the ResponseWriter. -func (scr *ServerCodecRequest) WriteResponse(w io.Writer, reply interface{}) error { - res := retainServerResponse(Version, reply, nil, scr.request.ID) - return scr.writeServerResponse(w, res) -} - -// WriteError encodes the response and writes it to the ResponseWriter. -func (scr *ServerCodecRequest) WriteError(w io.Writer, status int, err error) error { - jsonErr, ok := err.(*Error) - if !ok { - jsonErr = &Error{ - Code: E_SERVER, - Message: err.Error(), - } - } - res := retainServerResponse(Version, nil, jsonErr, scr.request.ID) - return scr.writeServerResponse(w, res) -} - -func (scr *ServerCodecRequest) writeServerResponse(w io.Writer, res *serverResponse) error { - defer func() { - if nil != res { - releaseServerResponse(res) - } - }() - // ID is null for notifications and they don't have a response. - if scr.request.ID != nil { - encoder := json.NewEncoder(scr.encoder.Encode(w)) - err := encoder.Encode(res) - - // Not sure in which case will this happen. But seems harmless. - if err != nil { - return err - } - } - return nil -} - -type EmptyResponse struct { -} - -var serverCodecRequestPool sync.Pool - -func retainServerCodecRequest(request *serverRequest, err error, encoder encode.Encoder) *ServerCodecRequest { - var scr *ServerCodecRequest - v := serverCodecRequestPool.Get() +func retainServerMessage(msgType protocol.MessageType, msg interface{}) *serverMessage { + var sm *serverMessage + v := serverMessagePool.Get() if v == nil { - scr = &ServerCodecRequest{} + sm = &serverMessage{} } else { - scr = v.(*ServerCodecRequest) + sm = v.(*serverMessage) } - scr.request = request - scr.err = err - scr.encoder = encoder + sm.Version = Version + sm.MessageType = msgType + sm.Message = msg - return scr + return sm } -func releaseServerCodecRequest(scr *ServerCodecRequest) { - scr.request = nil - scr.err = nil - scr.encoder = nil +func releaseServerMessage(sm *serverMessage) { + sm.Version = "" + sm.MessageType = protocol.MessageTypeUnknown + sm.Message = nil - serverCodecRequestPool.Put(scr) -} - -var serverRequestPool sync.Pool - -func retainServerRequest() *serverRequest { - v := serverRequestPool.Get() - if v == nil { - return &serverRequest{} - } - return v.(*serverRequest) -} - -func releaseServerRequest(sr *serverRequest) { - sr.Method = "" - sr.Params = nil - sr.ID = nil - - serverRequestPool.Put(sr) -} - -var serverResponsePool sync.Pool - -func retainServerResponse(version string, result interface{}, err *Error, id *json.RawMessage) *serverResponse { - var sr *serverResponse - v := serverResponsePool.Get() - if v == nil { - sr = &serverResponse{} - } else { - sr = v.(*serverResponse) - } - - sr.Version = version - sr.Result = result - sr.Error = err - sr.ID = id - - return sr -} - -func releaseServerResponse(sr *serverResponse) { - sr.Version = "" - sr.Result = nil - sr.Error = nil - sr.ID = nil - - serverResponsePool.Put(sr) + serverMessagePool.Put(sm) } diff --git a/protocol/json/server_notification.go b/protocol/json/server_notification.go new file mode 100644 index 0000000..4441add --- /dev/null +++ b/protocol/json/server_notification.go @@ -0,0 +1,38 @@ +package json + +import ( + "sync" +) + +// clientRequest represents a JSON-RPC request sent by a client. +type serverNotification struct { + // A String containing the name of the method to be invoked. + Method string `json:"method"` + + // Object to pass as request parameter to the method. + Params interface{} `json:"params"` +} + +var serverNotificationPool sync.Pool + +func retainServerNotification(method string, args interface{}) *serverNotification { + var sn *serverNotification + v := serverNotificationPool.Get() + if v == nil { + sn = &serverNotification{} + } else { + sn = v.(*serverNotification) + } + + sn.Method = method + sn.Params = args + + return sn +} + +func releaseServerNotification(sn *serverNotification) { + sn.Method = "" + sn.Params = nil + + serverNotificationPool.Put(sn) +} diff --git a/protocol/json/server_request.go b/protocol/json/server_request.go new file mode 100644 index 0000000..faab653 --- /dev/null +++ b/protocol/json/server_request.go @@ -0,0 +1,238 @@ +package json + +import ( + "encoding/json" + "io" + "sync" + + "git.loafle.net/commons_go/rpc/codec" + "git.loafle.net/commons_go/rpc/protocol" +) + +// ---------------------------------------------------------------------------- +// Request +// ---------------------------------------------------------------------------- + +// serverRequest represents a JSON-RPC request received by the server. +type serverRequest struct { + // JSON-RPC protocol. + Version string `json:"jsonrpc"` + + // A String containing the name of the method to be invoked. + Method string `json:"method"` + + // A Structured value to pass as arguments to the method. + Params *json.RawMessage `json:"params,omitempty"` + + // The request id. MUST be a string, number or null. + // Our implementation will not do type checking for id. + // It will be copied as it is. + ID *json.RawMessage `json:"id,omitempty"` +} + +// ---------------------------------------------------------------------------- +// ServerRequestCodec +// ---------------------------------------------------------------------------- + +// newRequestCodec returns a new ServerRequestCodec. +func newServerRequestCodec(r io.Reader, codec codec.Codec) (protocol.ServerRequestCodec, error) { + // Decode the request body and check if RPC method is valid. + req := retainServerRequest() + err := json.NewDecoder(r).Decode(req) + if err == io.ErrUnexpectedEOF || err == io.EOF { + return nil, err + } + if err != nil { + err = &Error{ + Code: E_PARSE, + Message: err.Error(), + Data: req, + } + } + if req.Version != Version { + err = &Error{ + Code: E_INVALID_REQ, + Message: "jsonrpc must be " + Version, + Data: req, + } + } + + return retainServerRequestCodec(req, err, codec), nil +} + +// ServerRequestCodec decodes and encodes a single request. +type ServerRequestCodec struct { + req *serverRequest + err error + codec codec.Codec +} + +// Close is callback function that end of request. +func (src *ServerRequestCodec) Close() { + if nil != src.req { + releaseServerRequest(src.req) + } + releaseServerRequestCodec(src) +} + +// Method returns the RPC method for the current request. +// +// The method uses a dotted notation as in "Service.Method". +func (src *ServerRequestCodec) Method() string { + return src.req.Method +} + +// ReadRequest fills the request object for the RPC method. +// +// ReadRequest parses request parameters in two supported forms in +// accordance with http://www.jsonrpc.org/specification#parameter_structures +// +// by-position: params MUST be an Array, containing the +// values in the Server expected order. +// +// by-name: params MUST be an Object, with member names +// that match the Server expected parameter names. The +// absence of expected names MAY result in an error being +// generated. The names MUST match exactly, including +// case, to the method's expected parameters. +func (src *ServerRequestCodec) ReadParams(args []interface{}) error { + if src.err == nil && src.req.Params != nil { + // Note: if src.req.Params is nil it's not an error, it's an optional member. + // JSON params structured object. Unmarshal to the args object. + + // if err := json.Unmarshal(*src.req.Params, &args); err != nil { + // // Clearly JSON params is not a structured object, + // // fallback and attempt an unmarshal with JSON params as + // // array value and RPC params is struct. Unmarshal into + // // array containing the request struct. + // src.err = &Error{ + // Code: E_INVALID_REQ, + // Message: err.Error(), + // Data: src.req.Params, + // } + // } + raws := make([]json.RawMessage, len(args)) + if err := json.Unmarshal(*src.req.Params, &raws); err != nil { + src.err = &Error{ + Code: E_INVALID_REQ, + Message: err.Error(), + Data: src.req.Params, + } + return src.err + } + + for indexI := 0; indexI < len(args); indexI++ { + raw := raws[indexI] + arg := args[indexI] + if err := json.Unmarshal(raw, &arg); err != nil { + src.err = &Error{ + Code: E_INVALID_REQ, + Message: err.Error(), + Data: src.req.Params, + } + return src.err + } + } + } + return src.err +} + +func (src *ServerRequestCodec) Params() ([]string, error) { + if src.err == nil && src.req.Params != nil { + var results []string + + if err := json.Unmarshal(*src.req.Params, &results); err != nil { + src.err = &Error{ + Code: E_INVALID_REQ, + Message: err.Error(), + Data: src.req.Params, + } + return nil, src.err + } + + return results, nil + } + return nil, src.err +} + +// WriteResponse encodes the response and writes it to the ResponseWriter. +func (src *ServerRequestCodec) WriteResponse(w io.Writer, reply interface{}) error { + res := retainServerResponse(reply, nil, src.req.ID) + return src.writeServerResponse(w, res) +} + +// WriteError encodes the response and writes it to the ResponseWriter. +func (src *ServerRequestCodec) WriteError(w io.Writer, status int, err error) error { + jsonErr, ok := err.(*Error) + if !ok { + jsonErr = &Error{ + Code: E_SERVER, + Message: err.Error(), + } + } + res := retainServerResponse(nil, jsonErr, src.req.ID) + return src.writeServerResponse(w, res) +} + +func (src *ServerRequestCodec) writeServerResponse(w io.Writer, res *serverResponse) error { + defer func() { + releaseServerResponse(res) + }() + // ID is null for notifications and they don't have a response. + if src.req.ID != nil { + msg := retainServerMessage(protocol.MessageTypeResponse, res) + defer func() { + releaseServerMessage(msg) + }() + encoder := json.NewEncoder(src.codec.Encode(w)) + // Not sure in which case will this happen. But seems harmless. + if err := encoder.Encode(msg); nil != err { + return err + } + } + return nil +} + +var serverRequestCodecPool sync.Pool + +func retainServerRequestCodec(req *serverRequest, err error, codec codec.Codec) *ServerRequestCodec { + var src *ServerRequestCodec + v := serverRequestCodecPool.Get() + if v == nil { + src = &ServerRequestCodec{} + } else { + src = v.(*ServerRequestCodec) + } + + src.req = req + src.err = err + src.codec = codec + + return src +} + +func releaseServerRequestCodec(src *ServerRequestCodec) { + src.req = nil + src.err = nil + src.codec = nil + + serverRequestCodecPool.Put(src) +} + +var serverRequestPool sync.Pool + +func retainServerRequest() *serverRequest { + v := serverRequestPool.Get() + if v == nil { + return &serverRequest{} + } + return v.(*serverRequest) +} + +func releaseServerRequest(sr *serverRequest) { + sr.Method = "" + sr.Params = nil + sr.ID = nil + + serverRequestPool.Put(sr) +} diff --git a/protocol/json/server_response.go b/protocol/json/server_response.go new file mode 100644 index 0000000..1c15515 --- /dev/null +++ b/protocol/json/server_response.go @@ -0,0 +1,52 @@ +package json + +import ( + "encoding/json" + "sync" +) + +// ---------------------------------------------------------------------------- +// Response +// ---------------------------------------------------------------------------- + +// serverResponse represents a JSON-RPC response returned by the server. +type serverResponse struct { + // The Object that was returned by the invoked method. This must be null + // in case there was an error invoking the method. + // As per spec the member will be omitted if there was an error. + Result interface{} `json:"result,omitempty"` + + // An Error object if there was an error invoking the method. It must be + // null if there was no error. + // As per spec the member will be omitted if there was no error. + Error *Error `json:"error,omitempty"` + + // This must be the same id as the request it is responding to. + ID *json.RawMessage `json:"id,omitempty"` +} + +var serverResponsePool sync.Pool + +func retainServerResponse(result interface{}, err *Error, id *json.RawMessage) *serverResponse { + var sr *serverResponse + v := serverResponsePool.Get() + if v == nil { + sr = &serverResponse{} + } else { + sr = v.(*serverResponse) + } + + sr.Result = result + sr.Error = err + sr.ID = id + + return sr +} + +func releaseServerResponse(sr *serverResponse) { + sr.Result = nil + sr.Error = nil + sr.ID = nil + + serverResponsePool.Put(sr) +} diff --git a/protocol/registry_codec.go b/protocol/registry_codec.go index 44babe7..429663b 100644 --- a/protocol/registry_codec.go +++ b/protocol/registry_codec.go @@ -10,5 +10,5 @@ type RegistryCodec interface { // Reads the request filling the RPC method args. ReadParams(args []interface{}) error Params() ([]string, error) - Complete() + Close() } diff --git a/protocol/server_codec.go b/protocol/server_codec.go index bffb9c6..911322b 100644 --- a/protocol/server_codec.go +++ b/protocol/server_codec.go @@ -4,14 +4,15 @@ import ( "io" ) -// ServerCodec creates a ServerCodecRequest to process each request. +// ServerCodec creates a ServerRequestCodec to process each request. type ServerCodec interface { - NewRequest(r io.Reader) (ServerCodecRequest, error) + NewRequest(r io.Reader) (ServerRequestCodec, error) + WriteNotification(w io.Writer, method string, args interface{}) error } -// ServerCodecRequest decodes a request and encodes a response using a specific +// ServerRequestCodec decodes a request and encodes a response using a specific // serialization scheme. -type ServerCodecRequest interface { +type ServerRequestCodec interface { RegistryCodec WriteResponse(w io.Writer, reply interface{}) error WriteError(w io.Writer, status int, err error) error diff --git a/server/handle.go b/server/handle.go deleted file mode 100644 index 9407471..0000000 --- a/server/handle.go +++ /dev/null @@ -1,14 +0,0 @@ -package server - -import ( - "io" - - "git.loafle.net/commons_go/rpc" - "git.loafle.net/commons_go/rpc/protocol" -) - -func Handle(sh ServerHandler, codec protocol.ServerCodec, r io.Reader, w io.Writer) error { - return rpc.Handle(sh, codec, r, w, func(codecReq protocol.ServerCodecRequest) (result interface{}, err error) { - return sh.Invoke(codecReq) - }) -} diff --git a/server/server_handler.go b/server/server_handler.go deleted file mode 100644 index ba3d4bb..0000000 --- a/server/server_handler.go +++ /dev/null @@ -1,12 +0,0 @@ -package server - -import ( - "git.loafle.net/commons_go/rpc" - "git.loafle.net/commons_go/rpc/protocol" -) - -type ServerHandler interface { - rpc.ServerHandler - - Invoke(codec protocol.RegistryCodec) (result interface{}, err error) -} diff --git a/server/server_handlers.go b/server/server_handlers.go deleted file mode 100644 index 0e010af..0000000 --- a/server/server_handlers.go +++ /dev/null @@ -1,23 +0,0 @@ -package server - -import ( - "git.loafle.net/commons_go/rpc" - "git.loafle.net/commons_go/rpc/protocol" -) - -type ServerHandlers struct { - rpc.ServerHandlers - RPCRegistry rpc.Registry -} - -func (sh *ServerHandlers) Invoke(codec protocol.RegistryCodec) (result interface{}, err error) { - return sh.RPCRegistry.Invoke(codec) -} - -func (sh *ServerHandlers) Validate() { - sh.ServerHandlers.Validate() - - if nil == sh.RPCRegistry { - panic("RPCRegistry must be specified.") - } -} diff --git a/server/servlet_handler.go b/server/servlet_handler.go new file mode 100644 index 0000000..05b9903 --- /dev/null +++ b/server/servlet_handler.go @@ -0,0 +1,7 @@ +package server + +import "git.loafle.net/commons_go/rpc" + +type ServletHandler interface { + rpc.ServletHandler +} diff --git a/server/servlet_handlers.go b/server/servlet_handlers.go new file mode 100644 index 0000000..eaebba4 --- /dev/null +++ b/server/servlet_handlers.go @@ -0,0 +1,36 @@ +package server + +import ( + "fmt" + + "git.loafle.net/commons_go/logging" + "git.loafle.net/commons_go/rpc" + "git.loafle.net/commons_go/rpc/protocol" +) + +type ServletHandlers struct { + rpc.ServletHandlers + + RPCRegistry rpc.Registry +} + +func (sh *ServletHandlers) Invoke(requestCodec protocol.RegistryCodec) (result interface{}, err error) { + if !sh.RPCRegistry.HasMethod(requestCodec.Method()) { + return nil, fmt.Errorf("RPC Servlet Handler: Method[%s] is not exist", requestCodec.Method()) + } + + result, err = sh.RPCRegistry.Invoke(requestCodec) + if nil != err { + + } + + return +} + +func (sh *ServletHandlers) Validate() { + sh.ServletHandlers.Validate() + + if nil == sh.RPCRegistry { + logging.Logger().Panic(fmt.Sprintf("RPC Servlet Handler: RPC Registry must be specified")) + } +} diff --git a/server_handler.go b/server_handler.go deleted file mode 100644 index 6308760..0000000 --- a/server_handler.go +++ /dev/null @@ -1,28 +0,0 @@ -package rpc - -import ( - "io" - - "git.loafle.net/commons_go/rpc/protocol" -) - -type ServerHandler interface { - Init() error - OnStart() - - OnPreRead(r io.Reader) - OnPostRead(r io.Reader) - - OnPreWriteResult(w io.Writer, result interface{}) - OnPostWriteResult(w io.Writer, result interface{}) - - OnPreWriteError(w io.Writer, err error) - OnPostWriteError(w io.Writer, err error) - - OnStop() - - Validate() - - RegisterCodec(codec protocol.ServerCodec, contentType string) - GetCodec(contentType string) (protocol.ServerCodec, error) -} diff --git a/server_handlers.go b/server_handlers.go deleted file mode 100644 index 30c6e72..0000000 --- a/server_handlers.go +++ /dev/null @@ -1,86 +0,0 @@ -package rpc - -import ( - "fmt" - "io" - "strings" - - "git.loafle.net/commons_go/rpc/protocol" -) - -func NewServerHandler() ServerHandler { - sh := &ServerHandlers{} - - return sh -} - -type ServerHandlers struct { - codecs map[string]protocol.ServerCodec -} - -func (sh *ServerHandlers) Init() error { - - return nil -} - -func (sh *ServerHandlers) OnStart() { - // no op -} - -func (sh *ServerHandlers) OnPreRead(r io.Reader) { - // no op -} - -func (sh *ServerHandlers) OnPostRead(r io.Reader) { - // no op -} - -func (sh *ServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) { - // no op -} - -func (sh *ServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) { - // no op -} - -func (sh *ServerHandlers) OnPreWriteError(w io.Writer, err error) { - // no op -} - -func (sh *ServerHandlers) OnPostWriteError(w io.Writer, err error) { - // no op -} - -func (sh *ServerHandlers) OnStop() { - // no op -} - -func (sh *ServerHandlers) Validate() { -} - -// RegisterCodec adds a new codec to the server. -// -// Codecs are defined to process a given serialization scheme, e.g., JSON or -// XML. A codec is chosen based on the "Content-Type" header from the request, -// excluding the charset definition. -func (sh *ServerHandlers) RegisterCodec(codec protocol.ServerCodec, contentType string) { - if nil == sh.codecs { - sh.codecs = make(map[string]protocol.ServerCodec) - } - sh.codecs[strings.ToLower(contentType)] = codec -} - -func (sh *ServerHandlers) GetCodec(contentType string) (protocol.ServerCodec, error) { - var codec protocol.ServerCodec - if contentType == "" && len(sh.codecs) == 1 { - // If Content-Type is not set and only one codec has been registered, - // then default to that codec. - for _, c := range sh.codecs { - codec = c - } - } else if codec = sh.codecs[strings.ToLower(contentType)]; codec == nil { - return nil, fmt.Errorf("Unrecognized Content-Type: %s", contentType) - } - - return codec, nil -} diff --git a/servlet.go b/servlet.go new file mode 100644 index 0000000..402d5ed --- /dev/null +++ b/servlet.go @@ -0,0 +1,208 @@ +package rpc + +import ( + "fmt" + "sync" + + "git.loafle.net/commons_go/logging" + "git.loafle.net/commons_go/rpc/protocol" +) + +func NewServlet(sh ServletHandler) Servlet { + return &servlet{ + sh: sh, + } +} + +type Servlet interface { + Start(contentType string, reader interface{}, writer interface{}) error + Stop() + + Send(method string, args ...interface{}) (err error) +} + +type servlet struct { + sh ServletHandler + + contentType string + reader interface{} + writer interface{} + serverCodec protocol.ServerCodec + + messageQueueChan chan *messageState + + stopChan chan struct{} + stopWg sync.WaitGroup +} + +func (s *servlet) Start(contentType string, reader interface{}, writer interface{}) error { + if nil == s.sh { + panic("Servlet: servlet handler must be specified.") + } + s.sh.Validate() + + if s.stopChan != nil { + panic("Servlet: servlet is already running. Stop it before starting it again") + } + + sc, err := s.sh.getCodec(contentType) + if nil != err { + return err + } + + s.contentType = contentType + s.reader = reader + s.writer = writer + s.serverCodec = sc + + if err := s.sh.Init(); nil != err { + logging.Logger().Panic(fmt.Sprintf("Servlet: Initialization of servlet has been failed %v", err)) + } + + s.stopChan = make(chan struct{}) + s.messageQueueChan = make(chan *messageState, s.sh.GetPendingMessages()) + + s.stopWg.Add(1) + go handleServlet(s) + + return nil +} + +func (s *servlet) Stop() { + if s.stopChan == nil { + panic("Server: server must be started before stopping it") + } + close(s.stopChan) + s.stopWg.Wait() + s.stopChan = nil + + s.sh.Destroy() + + s.contentType = "" + s.reader = nil + s.writer = nil + s.serverCodec = nil + + logging.Logger().Info(fmt.Sprintf("Servlet is stopped")) +} + +func (s *servlet) Send(method string, args ...interface{}) (err error) { + ms := retainMessageState(protocol.MessageTypeNotification) + ms.noti.method = method + ms.noti.args = args + + s.messageQueueChan <- ms + + return nil +} + +func handleServlet(s *servlet) { + defer s.stopWg.Done() + + s.stopWg.Add(1) + go handleMessage(s) + + for { + requestCodec, err := s.sh.GetRequest(s.serverCodec, s.reader) + if nil != err { + continue + } + + s.stopWg.Add(1) + go handleRequest(s, requestCodec) + + select { + case <-s.stopChan: + default: + } + } +} + +func handleRequest(s *servlet, requestCodec protocol.ServerRequestCodec) { + defer func() { + s.stopWg.Done() + }() + + result, err := s.sh.Invoke(requestCodec) + + ms := retainMessageState(protocol.MessageTypeResponse) + ms.res.requestCodec = requestCodec + ms.res.result = result + ms.res.err = err + + s.messageQueueChan <- ms +} + +func handleMessage(s *servlet) { + defer func() { + s.stopWg.Done() + }() + + for { + select { + case ms := <-s.messageQueueChan: + switch ms.messageType { + case protocol.MessageTypeResponse: + if err := s.sh.SendResponse(ms.res.requestCodec, s.writer, ms.res.result, ms.res.err); nil != err { + logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err)) + } + ms.res.requestCodec.Close() + case protocol.MessageTypeNotification: + if err := s.sh.SendNotification(s.serverCodec, s.writer, ms.noti.method, ms.noti.args...); nil != err { + logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err)) + } + default: + } + + releaseMessageState(ms) + case <-s.stopChan: + return + } + } +} + +type messageState struct { + messageType protocol.MessageType + res messageResponse + noti messageNotification +} + +type messageResponse struct { + requestCodec protocol.ServerRequestCodec + result interface{} + err error +} + +type messageNotification struct { + method string + args []interface{} +} + +var messageStatePool sync.Pool + +func retainMessageState(messageType protocol.MessageType) *messageState { + var ms *messageState + v := messageStatePool.Get() + if v == nil { + ms = &messageState{} + } else { + ms = v.(*messageState) + } + + ms.messageType = messageType + + return ms +} + +func releaseMessageState(ms *messageState) { + ms.messageType = protocol.MessageTypeUnknown + + ms.res.requestCodec = nil + ms.res.result = nil + ms.res.err = nil + + ms.noti.method = "" + ms.noti.args = nil + + messageStatePool.Put(ms) +} diff --git a/servlet_handler.go b/servlet_handler.go new file mode 100644 index 0000000..369cdfe --- /dev/null +++ b/servlet_handler.go @@ -0,0 +1,21 @@ +package rpc + +import "git.loafle.net/commons_go/rpc/protocol" + +type ServletHandler interface { + Init() error + + GetRequest(codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error) + Invoke(requestCodec protocol.RegistryCodec) (result interface{}, err error) + SendResponse(requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error + SendNotification(codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error + + Destroy() + + RegisterCodec(contentType string, codec protocol.ServerCodec) + + getCodec(contentType string) (protocol.ServerCodec, error) + + GetPendingMessages() int + Validate() +} diff --git a/servlet_handlers.go b/servlet_handlers.go new file mode 100644 index 0000000..b28d145 --- /dev/null +++ b/servlet_handlers.go @@ -0,0 +1,82 @@ +package rpc + +import ( + "fmt" + "strings" + + "git.loafle.net/commons_go/rpc/protocol" +) + +type ServletHandlers struct { + // The maximum number of pending messages in the queue. + // + // The number of pending requsts should exceed the expected number + // of concurrent goroutines calling client's methods. + // Otherwise a lot of ClientError.Overflow errors may appear. + // + // Default is DefaultPendingMessages. + PendingMessages int + + codecs map[string]protocol.ServerCodec +} + +func (sh *ServletHandlers) Init() error { + return nil +} + +func (sh *ServletHandlers) GetRequest(codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error) { + return nil, fmt.Errorf("Servlet Handler: GetRequest is not implemented") +} + +func (sh *ServletHandlers) Invoke(requestCodec protocol.RegistryCodec) (result interface{}, err error) { + return nil, fmt.Errorf("Servlet Handler: Invoke is not implemented") +} + +func (sh *ServletHandlers) SendResponse(requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error { + return fmt.Errorf("Servlet Handler: SendResponse is not implemented") +} + +func (sh *ServletHandlers) SendNotification(codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error { + return fmt.Errorf("Servlet Handler: SendNotification is not implemented") +} + +func (sh *ServletHandlers) Destroy() { + // no op +} + +// RegisterCodec adds a new codec to the server. +// +// Codecs are defined to process a given serialization scheme, e.g., JSON or +// XML. A codec is chosen based on the "Content-Type" header from the request, +// excluding the charset definition. +func (sh *ServletHandlers) RegisterCodec(contentType string, codec protocol.ServerCodec) { + if nil == sh.codecs { + sh.codecs = make(map[string]protocol.ServerCodec) + } + sh.codecs[strings.ToLower(contentType)] = codec +} + +func (sh *ServletHandlers) getCodec(contentType string) (protocol.ServerCodec, error) { + var codec protocol.ServerCodec + if contentType == "" && len(sh.codecs) == 1 { + // If Content-Type is not set and only one codec has been registered, + // then default to that codec. + for _, c := range sh.codecs { + codec = c + } + } else if codec = sh.codecs[strings.ToLower(contentType)]; codec == nil { + return nil, fmt.Errorf("Unrecognized Content-Type: %s", contentType) + } + + return codec, nil +} + +func (sh *ServletHandlers) GetPendingMessages() int { + return sh.PendingMessages +} + +func (sh *ServletHandlers) Validate() { + if 0 >= sh.PendingMessages { + sh.PendingMessages = DefaultPendingMessages + } +} diff --git a/transport/socket/servlet_handler.go b/transport/socket/servlet_handler.go new file mode 100644 index 0000000..e532ca5 --- /dev/null +++ b/transport/socket/servlet_handler.go @@ -0,0 +1,7 @@ +package socket + +import "git.loafle.net/commons_go/rpc/server" + +type ServletHandler interface { + server.ServletHandler +} diff --git a/transport/socket/servlet_handlers.go b/transport/socket/servlet_handlers.go new file mode 100644 index 0000000..fd82fda --- /dev/null +++ b/transport/socket/servlet_handlers.go @@ -0,0 +1,45 @@ +package socket + +import ( + "io" + + "git.loafle.net/commons_go/rpc/protocol" + "git.loafle.net/commons_go/rpc/server" +) + +type ServletHandlers struct { + server.ServletHandlers +} + +func (sh *ServletHandlers) GetRequest(codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error) { + r := reader.(io.Reader) + requestCodec, err := codec.NewRequest(r) + + return requestCodec, err +} + +func (sh *ServletHandlers) SendResponse(requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error { + w := writer.(io.Writer) + + if nil != err { + if lerr := requestCodec.WriteError(w, 500, err); nil != lerr { + + } + } else { + if err := requestCodec.WriteResponse(w, result); nil != err { + + } + } + + return nil +} + +func (sh *ServletHandlers) SendNotification(codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error { + w := writer.(io.Writer) + + if err := codec.WriteNotification(w, method, args); nil != err { + + } + + return nil +} diff --git a/transport/websocket/fasthttp/servlet_handler.go b/transport/websocket/fasthttp/servlet_handler.go new file mode 100644 index 0000000..0d65810 --- /dev/null +++ b/transport/websocket/fasthttp/servlet_handler.go @@ -0,0 +1,7 @@ +package fasthttp + +import "git.loafle.net/commons_go/rpc/server" + +type ServletHandler interface { + server.ServletHandler +} diff --git a/transport/websocket/fasthttp/servlet_handlers.go b/transport/websocket/fasthttp/servlet_handlers.go new file mode 100644 index 0000000..bc198e1 --- /dev/null +++ b/transport/websocket/fasthttp/servlet_handlers.go @@ -0,0 +1,58 @@ +package fasthttp + +import ( + "fmt" + + "git.loafle.net/commons_go/rpc/protocol" + "git.loafle.net/commons_go/rpc/server" + "git.loafle.net/commons_go/websocket_fasthttp/websocket" +) + +type ServletHandlers struct { + server.ServletHandlers +} + +func (sh *ServletHandlers) GetRequest(codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error) { + conn := reader.(*websocket.Conn) + _, r, err := conn.NextReader() + + requestCodec, err := codec.NewRequest(r) + + return requestCodec, err +} + +func (sh *ServletHandlers) SendResponse(requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error { + conn := writer.(*websocket.Conn) + + wc, lerr := conn.NextWriter(websocket.TextMessage) + if nil != lerr { + + } + + if nil != err { + if lerr := requestCodec.WriteError(wc, 500, err); nil != lerr { + + } + } else { + if err := requestCodec.WriteResponse(wc, result); nil != err { + + } + } + + return fmt.Errorf("Servlet Handler: SendResponse is not implemented") +} + +func (sh *ServletHandlers) SendNotification(codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error { + conn := writer.(*websocket.Conn) + + wc, lerr := conn.NextWriter(websocket.TextMessage) + if nil != lerr { + + } + + if err := codec.WriteNotification(wc, method, args); nil != err { + + } + + return fmt.Errorf("Servlet Handler: SendNotification is not implemented") +}