From afd49dd0dca2754a7495d83f075c576d5bcbbd33 Mon Sep 17 00:00:00 2001 From: crusader Date: Wed, 29 Nov 2017 18:55:24 +0900 Subject: [PATCH] ing --- client/client.go | 140 ++++++++++-------- client/client_context.go | 20 +++ client/client_handler.go | 10 +- client/client_handlers.go | 29 +++- client/client_rwc_handler.go | 12 ++ client/client_rwc_handlers.go | 30 ++++ client/request_state.go | 2 +- client/rwc/socket/client_handlers.go | 48 ++++++ .../rwc/websocket/fasthttp/client_handlers.go | 52 +++++++ connection/socket/servlet_handlers.go | 44 ------ server/rwc/socket/servlet_handlers.go | 44 ++++++ .../websocket/fasthttp/servlet_handlers.go | 9 +- servlet.go | 32 ++-- servlet_handler.go | 3 - servlet_handlers.go | 12 -- servlet_rwc_handler.go | 11 ++ servlet_rwc_handlers.go | 26 ++++ 17 files changed, 379 insertions(+), 145 deletions(-) create mode 100644 client/client_context.go create mode 100644 client/client_rwc_handler.go create mode 100644 client/client_rwc_handlers.go create mode 100644 client/rwc/socket/client_handlers.go create mode 100644 client/rwc/websocket/fasthttp/client_handlers.go delete mode 100644 connection/socket/servlet_handlers.go create mode 100644 server/rwc/socket/servlet_handlers.go rename {connection => server/rwc}/websocket/fasthttp/servlet_handlers.go (59%) create mode 100644 servlet_rwc_handler.go create mode 100644 servlet_rwc_handlers.go diff --git a/client/client.go b/client/client.go index b96ced0..5c16597 100644 --- a/client/client.go +++ b/client/client.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "log" - "net" "reflect" "runtime" "sync" @@ -15,9 +14,10 @@ import ( "git.loafle.net/commons_go/rpc/protocol" ) -func New(ch ClientHandler) Client { +func New(ch ClientHandler, rwcHandler ClientReadWriteCloseHandler) Client { c := &client{ - ch: ch, + ch: ch, + rwcHandler: rwcHandler, } return c } @@ -32,9 +32,11 @@ type Client interface { } type client struct { - ch ClientHandler + ctx ClientContext + ch ClientHandler + rwcHandler ClientReadWriteCloseHandler - conn net.Conn + conn interface{} pendingRequestsCount uint32 pendingRequests map[uint64]*RequestState @@ -48,13 +50,27 @@ type client struct { func (c *client) Connect() error { var err error + + if nil == c.ch { + panic("RPC Client: Client handler must be specified.") + } c.ch.Validate() + if nil == c.rwcHandler { + panic("RPC Client: Client RWC handler must be specified.") + } + c.rwcHandler.Validate() + if c.stopChan != nil { panic("RPC Client: the given client is already started. Call Client.Stop() before calling Client.Start() again!") } + c.ctx = c.ch.ClientContext(nil) - if c.conn, err = c.ch.Connect(); nil != err { + if err := c.ch.Init(c.ctx); nil != err { + logging.Logger().Panic(fmt.Sprintf("RPC Client: Initialization of client has been failed %v", err)) + } + + if c.conn, err = c.rwcHandler.Connect(); nil != err { return err } c.stopChan = make(chan struct{}) @@ -68,23 +84,28 @@ func (c *client) Connect() error { func (c *client) Close() { if c.stopChan == nil { - panic("Client: the client must be started before stopping it") + panic("RPC Client: the client must be started before stopping it") } + + c.ch.Destroy(c.ctx) + close(c.stopChan) c.stopWg.Wait() c.stopChan = nil + + logging.Logger().Info(fmt.Sprintf("RPC Client: stopped")) } func (c *client) Send(method string, args ...interface{}) (err error) { - var cs *RequestState - if cs, err = c.send(true, false, nil, method, args...); nil != err { + var rs *RequestState + if rs, err = c.send(true, false, nil, method, args...); nil != err { return } select { - case <-cs.DoneChan: - err = cs.Error - releaseCallState(cs) + case <-rs.DoneChan: + err = rs.Error + releaseCallState(rs) } return @@ -95,19 +116,19 @@ func (c *client) Call(result interface{}, method string, args ...interface{}) er } func (c *client) CallTimeout(timeout time.Duration, result interface{}, method string, args ...interface{}) (err error) { - var cs *RequestState - if cs, err = c.send(true, true, result, method, args...); nil != err { + var rs *RequestState + if rs, err = c.send(true, true, result, method, args...); nil != err { return } t := retainTimer(timeout) select { - case <-cs.DoneChan: - result, err = cs.Result, cs.Error - releaseCallState(cs) + case <-rs.DoneChan: + result, err = rs.Result, rs.Error + releaseCallState(rs) case <-t.C: - cs.Cancel() + rs.Cancel() err = getClientTimeoutError(c, timeout) } @@ -116,30 +137,30 @@ func (c *client) CallTimeout(timeout time.Duration, result interface{}, method s return } -func (c *client) send(usePool bool, hasResponse bool, result interface{}, method string, args ...interface{}) (cs *RequestState, err error) { +func (c *client) send(usePool bool, hasResponse bool, result interface{}, method string, args ...interface{}) (rs *RequestState, err error) { if !hasResponse { usePool = true } if usePool { - cs = retainRequestState() + rs = retainRequestState() } else { - cs = &RequestState{} + rs = &RequestState{} } - cs.hasResponse = hasResponse - cs.Method = method - cs.Args = args - cs.DoneChan = make(chan *RequestState, 1) + rs.hasResponse = hasResponse + rs.Method = method + rs.Args = args + rs.DoneChan = make(chan *RequestState, 1) if hasResponse { - cs.ID = c.ch.GetRequestID() - cs.Result = result + rs.ID = c.ch.GetRequestID() + rs.Result = result } select { - case c.requestQueueChan <- cs: - return cs, nil + case c.requestQueueChan <- rs: + return rs, nil default: // Try substituting the oldest async request by the new one // on requests' queue overflow. @@ -149,7 +170,7 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method // Immediately notify the caller not interested // in the response on requests' queue overflow, since // there are no other ways to notify it later. - releaseCallState(cs) + releaseCallState(rs) return nil, getClientOverflowError(c) } @@ -166,12 +187,12 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method } select { - case c.requestQueueChan <- cs: - return cs, nil + case c.requestQueueChan <- rs: + return rs, nil default: // Release m even if usePool = true, since m wasn't exposed // to the caller yet. - releaseCallState(cs) + releaseCallState(rs) return nil, getClientOverflowError(c) } } @@ -201,7 +222,7 @@ func (c *client) handleRPC() { <-writerDone } - c.conn.Close() + c.rwcHandler.Disconnect(c.conn) if err != nil { //c.LogError("%s", err) @@ -212,11 +233,11 @@ func (c *client) handleRPC() { } } - for _, cs := range c.pendingRequests { + for _, rs := range c.pendingRequests { atomic.AddUint32(&c.pendingRequestsCount, ^uint32(0)) - cs.Error = err - if cs.DoneChan != nil { - cs.Done() + rs.Error = err + if rs.DoneChan != nil { + rs.Done() } } @@ -229,10 +250,10 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) { }() for { - var cs *RequestState + var rs *RequestState select { - case cs = <-c.requestQueueChan: + case rs = <-c.requestQueueChan: default: // Give the last chance for ready goroutines filling c.requestsChan :) runtime.Gosched() @@ -240,25 +261,25 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) { select { case <-stopChan: return - case cs = <-c.requestQueueChan: + case rs = <-c.requestQueueChan: } } - if cs.IsCanceled() { - if nil != cs.DoneChan { - // cs.Error = ErrCanceled + if rs.IsCanceled() { + if nil != rs.DoneChan { + // rs.Error = ErrCanceled // close(m.done) - cs.Done() + rs.Done() } else { - releaseCallState(cs) + releaseCallState(rs) } continue } - if cs.hasResponse { + if rs.hasResponse { c.pendingRequestsLock.Lock() n := len(c.pendingRequests) - c.pendingRequests[cs.ID] = cs + c.pendingRequests[rs.ID] = rs c.pendingRequestsLock.Unlock() atomic.AddUint32(&c.pendingRequestsCount, 1) @@ -270,13 +291,14 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) { } var requestID interface{} - if 0 < cs.ID { - requestID = cs.ID + if 0 < rs.ID { + requestID = rs.ID } - err = c.ch.GetCodec().WriteRequest(c.conn, cs.Method, cs.Args, requestID) - if !cs.hasResponse { - cs.Error = err - cs.Done() + + err = c.rwcHandler.WriteRequest(c.ctx, c.ch.GetCodec(), c.conn, rs.Method, rs.Args, requestID) + if !rs.hasResponse { + rs.Error = err + rs.Done() } if nil != err { if err == io.ErrUnexpectedEOF || err == io.EOF { @@ -303,7 +325,7 @@ func (c *client) rpcReader(readerDone chan<- error) { }() for { - resCodec, err := c.ch.GetCodec().NewResponse(c.conn) + resCodec, err := c.rwcHandler.ReadResponse(c.ctx, c.ch.GetCodec(), c.conn) if nil != err { if err == io.ErrUnexpectedEOF || err == io.EOF { logging.Logger().Info("Client: disconnected from server") @@ -332,7 +354,7 @@ func (c *client) handleResponse(resCodec protocol.ClientResponseCodec) error { c.pendingRequestsLock.Lock() id := reflect.ValueOf(resCodec.ID()).Convert(uint64Type).Uint() - cs, ok := c.pendingRequests[id] + rs, ok := c.pendingRequests[id] if ok { delete(c.pendingRequests, id) } @@ -344,18 +366,18 @@ func (c *client) handleResponse(resCodec protocol.ClientResponseCodec) error { atomic.AddUint32(&c.pendingRequestsCount, ^uint32(0)) - if err := resCodec.Result(cs.Result); nil != err { + if err := resCodec.Result(rs.Result); nil != err { log.Printf("responseHandle:%v", err) } if err := resCodec.Error(); nil != err { log.Printf("responseHandle:%v", err) - // cs.Error = &ClientError{ + // rs.Error = &ClientError{ // Server: true, // err: fmt.Errorf("gorpc.Client: [%s]. Server error: [%s]", c.Addr, wr.Error), // } } - cs.Done() + rs.Done() return nil } diff --git a/client/client_context.go b/client/client_context.go new file mode 100644 index 0000000..1867dde --- /dev/null +++ b/client/client_context.go @@ -0,0 +1,20 @@ +package client + +import ( + cuc "git.loafle.net/commons_go/util/context" +) + +type ClientContext interface { + cuc.Context +} + +type clientContext struct { + cuc.Context +} + +func newClientContext(parent cuc.Context) ClientContext { + cCTX := &clientContext{} + cCTX.Context = cuc.NewContext(parent) + + return cCTX +} diff --git a/client/client_handler.go b/client/client_handler.go index bd3c946..3fe94a8 100644 --- a/client/client_handler.go +++ b/client/client_handler.go @@ -1,21 +1,23 @@ package client import ( - "net" "time" "git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/rpc/protocol" + cuc "git.loafle.net/commons_go/util/context" ) type ClientHandler interface { - Connect() (net.Conn, error) - GetCodec() protocol.ClientCodec + ClientContext(parent cuc.Context) ClientContext + Init(clientCTX ClientContext) error + Destroy(clientCTX ClientContext) + + GetCodec() protocol.ClientCodec GetRPCRegistry() rpc.Registry GetRequestTimeout() time.Duration GetPendingRequests() int - GetRequestID() uint64 Validate() } diff --git a/client/client_handlers.go b/client/client_handlers.go index 0ae9620..5363c5b 100644 --- a/client/client_handlers.go +++ b/client/client_handlers.go @@ -1,17 +1,19 @@ package client import ( - "errors" - "net" "sync" "time" + "git.loafle.net/commons_go/logging" + "git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/rpc/protocol" ) type ClientHandlers struct { - Codec protocol.ClientCodec + Codec protocol.ClientCodec + RPCRegistry rpc.Registry + // Maximum request time. // Default value is DefaultRequestTimeout. RequestTimeout time.Duration @@ -24,14 +26,20 @@ type ClientHandlers struct { // Default is DefaultPendingMessages. PendingRequests int - RPCRegistry rpc.Registry - requestID uint64 requestIDMtx sync.Mutex } -func (ch *ClientHandlers) Connect() (net.Conn, error) { - return nil, errors.New("RPC Client: ClientHandlers method[Connect] is not implement") +func (ch *ClientHandlers) ClientContext(parent cuc.Context) ClientContext { + return newClientContext(parent) +} + +func (ch *ClientHandlers) Init(clientCTX ClientContext) error { + return nil +} + +func (ch *ClientHandlers) Destroy(clientCTX ClientContext) { + // no op } func (ch *ClientHandlers) GetCodec() protocol.ClientCodec { @@ -61,6 +69,13 @@ func (ch *ClientHandlers) GetRequestID() uint64 { } func (ch *ClientHandlers) Validate() { + if nil == ch.Codec { + logging.Logger().Panic("RPC Client Handler: Codec must be specified") + } + if nil == ch.RPCRegistry { + logging.Logger().Panic("RPC Client Handler: RPCRegistry must be specified") + } + if ch.RequestTimeout <= 0 { ch.RequestTimeout = DefaultRequestTimeout } diff --git a/client/client_rwc_handler.go b/client/client_rwc_handler.go new file mode 100644 index 0000000..70c4512 --- /dev/null +++ b/client/client_rwc_handler.go @@ -0,0 +1,12 @@ +package client + +import "git.loafle.net/commons_go/rpc/protocol" + +type ClientReadWriteCloseHandler interface { + Connect() (interface{}, error) + ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) + WriteRequest(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params interface{}, id interface{}) error + Disconnect(conn interface{}) + + Validate() +} diff --git a/client/client_rwc_handlers.go b/client/client_rwc_handlers.go new file mode 100644 index 0000000..aaeb5ae --- /dev/null +++ b/client/client_rwc_handlers.go @@ -0,0 +1,30 @@ +package client + +import ( + "fmt" + + "git.loafle.net/commons_go/rpc/protocol" +) + +type ClientReadWriteCloseHandlers struct { +} + +func (crwch *ClientReadWriteCloseHandlers) Connect() (interface{}, error) { + return nil, fmt.Errorf("RPC Client RWC Handler: ClientHandlers method[Connect] is not implement") +} + +func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { + return nil, fmt.Errorf("RPC Client RWC Handler: ClientHandlers method[ReadResponse] is not implement") +} + +func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params interface{}, id interface{}) error { + return fmt.Errorf("RPC Client RWC Handler: ClientHandlers method[WriteRequest] is not implement") +} + +func (crwch *ClientReadWriteCloseHandlers) Disconnect(conn interface{}) { + // no op +} + +func (crwch *ClientReadWriteCloseHandlers) Validate() { + +} diff --git a/client/request_state.go b/client/request_state.go index b991601..d312c59 100644 --- a/client/request_state.go +++ b/client/request_state.go @@ -11,7 +11,7 @@ var zeroTime time.Time type RequestState struct { ID uint64 Method string - Args interface{} + Args []interface{} Result interface{} Error error DoneChan chan *RequestState diff --git a/client/rwc/socket/client_handlers.go b/client/rwc/socket/client_handlers.go new file mode 100644 index 0000000..09ee546 --- /dev/null +++ b/client/rwc/socket/client_handlers.go @@ -0,0 +1,48 @@ +package socket + +import ( + "fmt" + + "git.loafle.net/commons_go/logging" + "git.loafle.net/commons_go/rpc/client" + "git.loafle.net/commons_go/rpc/protocol" + "git.loafle.net/commons_go/server" +) + +type ClientReadWriteCloseHandlers struct { + client.ClientReadWriteCloseHandlers + + Address string +} + +func (crwch *ClientReadWriteCloseHandlers) Connect() (interface{}, error) { + return nil, fmt.Errorf("RPC Client RWC Handler: ClientHandlers method[Connect] is not implement") +} + +func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { + soc := conn.(server.Socket) + resCodec, err := codec.NewResponse(soc) + + return resCodec, err +} + +func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params interface{}, id interface{}) error { + soc := conn.(server.Socket) + + if wErr := codec.WriteRequest(soc, method, params); nil != wErr { + return wErr + } + + return nil +} + +func (crwch *ClientReadWriteCloseHandlers) Disconnect(conn interface{}) { + soc := conn.(server.Socket) + soc.Close() +} + +func (crwch *ClientReadWriteCloseHandlers) Validate() { + if "" == crwch.Address { + logging.Logger().Panic("RPC Client RWC Handler: Address must be specified") + } +} diff --git a/client/rwc/websocket/fasthttp/client_handlers.go b/client/rwc/websocket/fasthttp/client_handlers.go new file mode 100644 index 0000000..ca1e8e5 --- /dev/null +++ b/client/rwc/websocket/fasthttp/client_handlers.go @@ -0,0 +1,52 @@ +package fasthttp + +import ( + "fmt" + + "github.com/gorilla/websocket" + + "git.loafle.net/commons_go/rpc/client" + "git.loafle.net/commons_go/rpc/protocol" + cwf "git.loafle.net/commons_go/websocket_fasthttp" +) + +type ClientReadWriteCloseHandlers struct { + client.ClientReadWriteCloseHandlers +} + +func (crwch *ClientReadWriteCloseHandlers) Connect() (interface{}, error) { + return nil, fmt.Errorf("RPC Client RWC Handler: ClientHandlers method[Connect] is not implement") +} + +func (crwch *ClientReadWriteCloseHandlers) ReadResponse(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}) (protocol.ClientResponseCodec, error) { + soc := conn.(cwf.Socket) + _, r, err := soc.NextReader() + + resCodec, err := codec.NewResponse(r) + + return resCodec, err +} + +func (crwch *ClientReadWriteCloseHandlers) WriteRequest(clientCTX ClientContext, codec protocol.ClientCodec, conn interface{}, method string, params interface{}, id interface{}) error { + soc := conn.(cwf.Socket) + + wc, wErr := soc.NextWriter(websocket.TextMessage) + if nil != wErr { + return wErr + } + + if wErr := codec.WriteRequest(wc, method, params); nil != wErr { + return wErr + } + + return nil +} + +func (crwch *ClientReadWriteCloseHandlers) Disconnect(conn interface{}) { + soc := conn.(cwf.Socket) + soc.Close() +} + +func (crwch *ClientReadWriteCloseHandlers) Validate() { + +} diff --git a/connection/socket/servlet_handlers.go b/connection/socket/servlet_handlers.go deleted file mode 100644 index 5bca25a..0000000 --- a/connection/socket/servlet_handlers.go +++ /dev/null @@ -1,44 +0,0 @@ -package socket - -import ( - "net" - - "git.loafle.net/commons_go/rpc" - "git.loafle.net/commons_go/rpc/protocol" -) - -type ServletHandlers struct { -} - -func (sh *ServletHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { - nConn := conn.(net.Conn) - requestCodec, err := codec.NewRequest(nConn) - - return requestCodec, err -} - -func (sh *ServletHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { - nConn := conn.(net.Conn) - - if nil != err { - if wErr := requestCodec.WriteError(nConn, 500, err); nil != wErr { - return wErr - } - } else { - if wErr := requestCodec.WriteResponse(nConn, result); nil != wErr { - return wErr - } - } - - return nil -} - -func (sh *ServletHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args []interface{}) error { - nConn := conn.(net.Conn) - - if wErr := codec.WriteNotification(nConn, method, args); nil != wErr { - return wErr - } - - return nil -} diff --git a/server/rwc/socket/servlet_handlers.go b/server/rwc/socket/servlet_handlers.go new file mode 100644 index 0000000..f641c1e --- /dev/null +++ b/server/rwc/socket/servlet_handlers.go @@ -0,0 +1,44 @@ +package socket + +import ( + "git.loafle.net/commons_go/rpc" + "git.loafle.net/commons_go/rpc/protocol" + "git.loafle.net/commons_go/server" +) + +type ServletReadWriteCloseHandler struct { + rpc.ServletReadWriteCloseHandler +} + +func (srwch *ServletReadWriteCloseHandler) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { + soc := conn.(server.Socket) + reqCodec, err := codec.NewRequest(soc) + + return reqCodec, err +} + +func (srwch *ServletReadWriteCloseHandler) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, reqCodec protocol.ServerRequestCodec, result interface{}, err error) error { + soc := conn.(server.Socket) + + if nil != err { + if wErr := reqCodec.WriteError(soc, 500, err); nil != wErr { + return wErr + } + } else { + if wErr := reqCodec.WriteResponse(soc, result); nil != wErr { + return wErr + } + } + + return nil +} + +func (srwch *ServletReadWriteCloseHandler) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params interface{}) error { + soc := conn.(server.Socket) + + if wErr := codec.WriteNotification(soc, method, params); nil != wErr { + return wErr + } + + return nil +} diff --git a/connection/websocket/fasthttp/servlet_handlers.go b/server/rwc/websocket/fasthttp/servlet_handlers.go similarity index 59% rename from connection/websocket/fasthttp/servlet_handlers.go rename to server/rwc/websocket/fasthttp/servlet_handlers.go index e43d1eb..83938a3 100644 --- a/connection/websocket/fasthttp/servlet_handlers.go +++ b/server/rwc/websocket/fasthttp/servlet_handlers.go @@ -7,10 +7,11 @@ import ( "git.loafle.net/commons_go/websocket_fasthttp/websocket" ) -type ServletHandlers struct { +type ServletReadWriteCloseHandler struct { + rpc.ServletReadWriteCloseHandler } -func (sh *ServletHandlers) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { +func (srwch *ServletReadWriteCloseHandler) ReadRequest(servletCTX rpc.ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { soc := conn.(cwf.Socket) _, r, err := soc.NextReader() @@ -19,7 +20,7 @@ func (sh *ServletHandlers) ReadRequest(servletCTX rpc.ServletContext, codec prot return requestCodec, err } -func (sh *ServletHandlers) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { +func (srwch *ServletReadWriteCloseHandler) WriteResponse(servletCTX rpc.ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { soc := conn.(cwf.Socket) wc, wErr := soc.NextWriter(websocket.TextMessage) @@ -40,7 +41,7 @@ func (sh *ServletHandlers) WriteResponse(servletCTX rpc.ServletContext, conn int return nil } -func (sh *ServletHandlers) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args []interface{}) error { +func (srwch *ServletReadWriteCloseHandler) WriteNotification(servletCTX rpc.ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args []interface{}) error { soc := conn.(cwf.Socket) wc, wErr := soc.NextWriter(websocket.TextMessage) diff --git a/servlet.go b/servlet.go index fcebecc..c4fa917 100644 --- a/servlet.go +++ b/servlet.go @@ -11,9 +11,10 @@ import ( cuc "git.loafle.net/commons_go/util/context" ) -func NewServlet(sh ServletHandler) Servlet { +func NewServlet(sh ServletHandler, rwcSH ServletReadWriteCloseHandler) Servlet { return &rpcServlet{ - sh: sh, + sh: sh, + rwcSH: rwcSH, } } @@ -27,8 +28,10 @@ type Servlet interface { } type rpcServlet struct { - ctx ServletContext - sh ServletHandler + ctx ServletContext + sh ServletHandler + rwcSH ServletReadWriteCloseHandler + responseQueueChan chan *responseState doneChan chan<- error @@ -45,12 +48,17 @@ func (s *rpcServlet) Start(parentCTX cuc.Context, conn interface{}, doneChan cha } s.sh.Validate() + if nil == s.rwcSH { + panic("Servlet: servlet RWC handler must be specified.") + } + s.rwcSH.Validate() + if s.stopChan != nil { return fmt.Errorf("Servlet: servlet is already running. Stop it before starting it again") } - servletCTX := s.sh.ServletContext(parentCTX) + s.ctx = s.sh.ServletContext(parentCTX) - sc, err := s.sh.getCodec(servletCTX.GetAttribute(ContentTypeKey).(string)) + sc, err := s.sh.getCodec(s.ctx.GetAttribute(ContentTypeKey).(string)) if nil != err { return err } @@ -76,10 +84,12 @@ func (s *rpcServlet) Stop() { if s.stopChan == nil { panic("Server: server must be started before stopping it") } + + s.sh.Destroy(s.ctx) + close(s.stopChan) s.stopWg.Wait() s.stopChan = nil - s.sh.Destroy(s.ctx) s.responseQueueChan = nil @@ -153,7 +163,7 @@ func handleReader(s *rpcServlet, stopChan chan struct{}, doneChan chan error) { }() for { - requestCodec, err := s.sh.ReadRequest(s.ctx, s.serverCodec, s.conn) + requestCodec, err := s.rwcSH.ReadRequest(s.ctx, s.serverCodec, s.conn) if nil != err { if err == io.ErrUnexpectedEOF || err == io.EOF { err = fmt.Errorf("RPC Server: disconnected from client") @@ -204,11 +214,11 @@ func handleWriter(s *rpcServlet, stopChan chan struct{}, doneChan chan error) { } if nil != rs.requestCodec { - if err := s.sh.WriteResponse(s.ctx, s.conn, rs.requestCodec, rs.result, rs.err); nil != err { + if err := s.rwcSH.WriteResponse(s.ctx, s.conn, rs.requestCodec, rs.result, rs.err); nil != err { logging.Logger().Error(fmt.Sprintf("RPC Server: response error %v", err)) } } else { - if err := s.sh.WriteNotification(s.ctx, s.conn, s.serverCodec, rs.noti.method, rs.noti.args); nil != err { + if err := s.rwcSH.WriteNotification(s.ctx, s.conn, s.serverCodec, rs.noti.method, rs.noti.args); nil != err { logging.Logger().Error(fmt.Sprintf("RPC Server: notification error %v", err)) } } @@ -242,5 +252,5 @@ type responseState struct { type notification struct { method string - args []interface{} + args interface{} } diff --git a/servlet_handler.go b/servlet_handler.go index 1fa3c61..4d779e1 100644 --- a/servlet_handler.go +++ b/servlet_handler.go @@ -10,10 +10,7 @@ type ServletHandler interface { Init(servletCTX ServletContext) error - ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) Invoke(servletCTX ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) - WriteResponse(servletCTX ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error - WriteNotification(servletCTX ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args []interface{}) error Destroy(servletCTX ServletContext) diff --git a/servlet_handlers.go b/servlet_handlers.go index 6e39d7a..1f95a8b 100644 --- a/servlet_handlers.go +++ b/servlet_handlers.go @@ -29,22 +29,10 @@ func (sh *ServletHandlers) Init(servletCTX ServletContext) error { return nil } -func (sh *ServletHandlers) ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { - return nil, fmt.Errorf("Servlet Handler: ReadRequest is not implemented") -} - func (sh *ServletHandlers) Invoke(servletCTX ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) { return nil, fmt.Errorf("Servlet Handler: Invoke is not implemented") } -func (sh *ServletHandlers) WriteResponse(servletCTX ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { - return fmt.Errorf("Servlet Handler: WriteResponse is not implemented") -} - -func (sh *ServletHandlers) WriteNotification(servletCTX ServletContext, conn interface{}, codec protocol.ServerCodec, method string, args []interface{}) error { - return fmt.Errorf("Servlet Handler: WriteNotification is not implemented") -} - func (sh *ServletHandlers) Destroy(servletCTX ServletContext) { // no op } diff --git a/servlet_rwc_handler.go b/servlet_rwc_handler.go new file mode 100644 index 0000000..87ec010 --- /dev/null +++ b/servlet_rwc_handler.go @@ -0,0 +1,11 @@ +package rpc + +import "git.loafle.net/commons_go/rpc/protocol" + +type ServletReadWriteCloseHandler interface { + ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) + WriteResponse(servletCTX ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error + WriteNotification(servletCTX ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params interface{}) error + + Validate() +} diff --git a/servlet_rwc_handlers.go b/servlet_rwc_handlers.go new file mode 100644 index 0000000..10738ef --- /dev/null +++ b/servlet_rwc_handlers.go @@ -0,0 +1,26 @@ +package rpc + +import ( + "fmt" + + "git.loafle.net/commons_go/rpc/protocol" +) + +type ServletReadWriteCloseHandlers struct { +} + +func (srwch *ServletReadWriteCloseHandlers) ReadRequest(servletCTX ServletContext, codec protocol.ServerCodec, conn interface{}) (protocol.ServerRequestCodec, error) { + return nil, fmt.Errorf("Servlet RWC Handler: ReadRequest is not implemented") +} + +func (srwch *ServletReadWriteCloseHandlers) WriteResponse(servletCTX ServletContext, conn interface{}, requestCodec protocol.ServerRequestCodec, result interface{}, err error) error { + return fmt.Errorf("Servlet RWC Handler: WriteResponse is not implemented") +} + +func (srwch *ServletReadWriteCloseHandlers) WriteNotification(servletCTX ServletContext, conn interface{}, codec protocol.ServerCodec, method string, params interface{}) error { + return fmt.Errorf("Servlet RWC Handler: WriteNotification is not implemented") +} + +func (srwch *ServletReadWriteCloseHandlers) Validate() { + +}