diff --git a/adapter/http/gzip_encode.go b/adapter/http/gzip_encode.go index ffc779c..91bea62 100644 --- a/adapter/http/gzip_encode.go +++ b/adapter/http/gzip_encode.go @@ -8,7 +8,7 @@ import ( "strings" "unicode" - "git.loafle.net/commons_go/rpc/encode" + "git.loafle.net/commons_go/rpc/codec" ) // gzipWriter writes and closes the gzip writer. @@ -76,12 +76,12 @@ func acceptedEnc(req *http.Request) string { } // Select method selects the correct compression encoder based on http HEADER. -func (_ *CompressionSelector) Select(r *http.Request) Encoder { +func (_ *CompressionSelector) Select(r *http.Request) Codec { switch acceptedEnc(r) { case "gzip": return &gzipEncoder{} case "flate": return &flateEncoder{} } - return encode.DefaultEncoder + return codec.DefaultCodec } diff --git a/servlet.go b/servlet.go index 402d5ed..8679885 100644 --- a/servlet.go +++ b/servlet.go @@ -8,7 +8,7 @@ import ( "git.loafle.net/commons_go/rpc/protocol" ) -func NewServlet(sh ServletHandler) Servlet { +func NewServlet(servletCTX ServletContext, sh ServletHandler) Servlet { return &servlet{ sh: sh, } @@ -19,18 +19,20 @@ type Servlet interface { Stop() Send(method string, args ...interface{}) (err error) + + Context() ServletContext } type servlet struct { - sh ServletHandler + ctx ServletContext + sh ServletHandler + messageQueueChan chan *messageState contentType string reader interface{} writer interface{} serverCodec protocol.ServerCodec - messageQueueChan chan *messageState - stopChan chan struct{} stopWg sync.WaitGroup } @@ -42,7 +44,7 @@ func (s *servlet) Start(contentType string, reader interface{}, writer interface s.sh.Validate() if s.stopChan != nil { - panic("Servlet: servlet is already running. Stop it before starting it again") + return fmt.Errorf("Servlet: servlet is already running. Stop it before starting it again") } sc, err := s.sh.getCodec(contentType) @@ -55,7 +57,7 @@ func (s *servlet) Start(contentType string, reader interface{}, writer interface s.writer = writer s.serverCodec = sc - if err := s.sh.Init(); nil != err { + if err := s.sh.Init(s.ctx); nil != err { logging.Logger().Panic(fmt.Sprintf("Servlet: Initialization of servlet has been failed %v", err)) } @@ -75,8 +77,9 @@ func (s *servlet) Stop() { close(s.stopChan) s.stopWg.Wait() s.stopChan = nil + s.sh.Destroy(s.ctx) - s.sh.Destroy() + s.messageQueueChan = nil s.contentType = "" s.reader = nil @@ -96,6 +99,10 @@ func (s *servlet) Send(method string, args ...interface{}) (err error) { return nil } +func (s *servlet) Context() ServletContext { + return s.ctx +} + func handleServlet(s *servlet) { defer s.stopWg.Done() @@ -103,7 +110,7 @@ func handleServlet(s *servlet) { go handleMessage(s) for { - requestCodec, err := s.sh.GetRequest(s.serverCodec, s.reader) + requestCodec, err := s.sh.GetRequest(s.ctx, s.serverCodec, s.reader) if nil != err { continue } @@ -123,7 +130,7 @@ func handleRequest(s *servlet, requestCodec protocol.ServerRequestCodec) { s.stopWg.Done() }() - result, err := s.sh.Invoke(requestCodec) + result, err := s.sh.Invoke(s.ctx, requestCodec) ms := retainMessageState(protocol.MessageTypeResponse) ms.res.requestCodec = requestCodec @@ -143,12 +150,12 @@ func handleMessage(s *servlet) { case ms := <-s.messageQueueChan: switch ms.messageType { case protocol.MessageTypeResponse: - if err := s.sh.SendResponse(ms.res.requestCodec, s.writer, ms.res.result, ms.res.err); nil != err { + if err := s.sh.SendResponse(s.ctx, ms.res.requestCodec, s.writer, ms.res.result, ms.res.err); nil != err { logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err)) } ms.res.requestCodec.Close() case protocol.MessageTypeNotification: - if err := s.sh.SendNotification(s.serverCodec, s.writer, ms.noti.method, ms.noti.args...); nil != err { + if err := s.sh.SendNotification(s.ctx, s.serverCodec, s.writer, ms.noti.method, ms.noti.args...); nil != err { logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err)) } default: diff --git a/servlet_context.go b/servlet_context.go new file mode 100644 index 0000000..606eb51 --- /dev/null +++ b/servlet_context.go @@ -0,0 +1,20 @@ +package rpc + +import ( + cuc "git.loafle.net/commons_go/util/context" +) + +type ServletContext interface { + cuc.Context +} + +type servletContext struct { + cuc.Context +} + +func NewServletContext(parent cuc.Context) ServletContext { + sCTX := &servletContext{} + sCTX.Context = cuc.NewContext(parent) + + return sCTX +} diff --git a/servlet_handler.go b/servlet_handler.go index 369cdfe..6ad7776 100644 --- a/servlet_handler.go +++ b/servlet_handler.go @@ -3,14 +3,14 @@ package rpc import "git.loafle.net/commons_go/rpc/protocol" type ServletHandler interface { - Init() error + Init(servletCTX ServletContext) error - GetRequest(codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error) - Invoke(requestCodec protocol.RegistryCodec) (result interface{}, err error) - SendResponse(requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error - SendNotification(codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error + GetRequest(servletCTX ServletContext, codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error) + Invoke(servletCTX ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) + SendResponse(servletCTX ServletContext, requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error + SendNotification(servletCTX ServletContext, codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error - Destroy() + Destroy(servletCTX ServletContext) RegisterCodec(contentType string, codec protocol.ServerCodec) diff --git a/servlet_handlers.go b/servlet_handlers.go index b28d145..6bb4250 100644 --- a/servlet_handlers.go +++ b/servlet_handlers.go @@ -20,27 +20,27 @@ type ServletHandlers struct { codecs map[string]protocol.ServerCodec } -func (sh *ServletHandlers) Init() error { +func (sh *ServletHandlers) Init(servletCTX ServletContext) error { return nil } -func (sh *ServletHandlers) GetRequest(codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error) { +func (sh *ServletHandlers) GetRequest(servletCTX ServletContext, codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error) { return nil, fmt.Errorf("Servlet Handler: GetRequest is not implemented") } -func (sh *ServletHandlers) Invoke(requestCodec protocol.RegistryCodec) (result interface{}, err error) { +func (sh *ServletHandlers) Invoke(servletCTX ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) { return nil, fmt.Errorf("Servlet Handler: Invoke is not implemented") } -func (sh *ServletHandlers) SendResponse(requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error { +func (sh *ServletHandlers) SendResponse(servletCTX ServletContext, requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error { return fmt.Errorf("Servlet Handler: SendResponse is not implemented") } -func (sh *ServletHandlers) SendNotification(codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error { +func (sh *ServletHandlers) SendNotification(servletCTX ServletContext, codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error { return fmt.Errorf("Servlet Handler: SendNotification is not implemented") } -func (sh *ServletHandlers) Destroy() { +func (sh *ServletHandlers) Destroy(servletCTX ServletContext) { // no op }