This commit is contained in:
crusader 2017-11-27 21:04:25 +09:00
parent fa7025303d
commit 84676e199e
5 changed files with 53 additions and 26 deletions

View File

@ -8,7 +8,7 @@ import (
"strings" "strings"
"unicode" "unicode"
"git.loafle.net/commons_go/rpc/encode" "git.loafle.net/commons_go/rpc/codec"
) )
// gzipWriter writes and closes the gzip writer. // gzipWriter writes and closes the gzip writer.
@ -76,12 +76,12 @@ func acceptedEnc(req *http.Request) string {
} }
// Select method selects the correct compression encoder based on http HEADER. // Select method selects the correct compression encoder based on http HEADER.
func (_ *CompressionSelector) Select(r *http.Request) Encoder { func (_ *CompressionSelector) Select(r *http.Request) Codec {
switch acceptedEnc(r) { switch acceptedEnc(r) {
case "gzip": case "gzip":
return &gzipEncoder{} return &gzipEncoder{}
case "flate": case "flate":
return &flateEncoder{} return &flateEncoder{}
} }
return encode.DefaultEncoder return codec.DefaultCodec
} }

View File

@ -8,7 +8,7 @@ import (
"git.loafle.net/commons_go/rpc/protocol" "git.loafle.net/commons_go/rpc/protocol"
) )
func NewServlet(sh ServletHandler) Servlet { func NewServlet(servletCTX ServletContext, sh ServletHandler) Servlet {
return &servlet{ return &servlet{
sh: sh, sh: sh,
} }
@ -19,18 +19,20 @@ type Servlet interface {
Stop() Stop()
Send(method string, args ...interface{}) (err error) Send(method string, args ...interface{}) (err error)
Context() ServletContext
} }
type servlet struct { type servlet struct {
ctx ServletContext
sh ServletHandler sh ServletHandler
messageQueueChan chan *messageState
contentType string contentType string
reader interface{} reader interface{}
writer interface{} writer interface{}
serverCodec protocol.ServerCodec serverCodec protocol.ServerCodec
messageQueueChan chan *messageState
stopChan chan struct{} stopChan chan struct{}
stopWg sync.WaitGroup stopWg sync.WaitGroup
} }
@ -42,7 +44,7 @@ func (s *servlet) Start(contentType string, reader interface{}, writer interface
s.sh.Validate() s.sh.Validate()
if s.stopChan != nil { if s.stopChan != nil {
panic("Servlet: servlet is already running. Stop it before starting it again") return fmt.Errorf("Servlet: servlet is already running. Stop it before starting it again")
} }
sc, err := s.sh.getCodec(contentType) sc, err := s.sh.getCodec(contentType)
@ -55,7 +57,7 @@ func (s *servlet) Start(contentType string, reader interface{}, writer interface
s.writer = writer s.writer = writer
s.serverCodec = sc s.serverCodec = sc
if err := s.sh.Init(); nil != err { if err := s.sh.Init(s.ctx); nil != err {
logging.Logger().Panic(fmt.Sprintf("Servlet: Initialization of servlet has been failed %v", err)) logging.Logger().Panic(fmt.Sprintf("Servlet: Initialization of servlet has been failed %v", err))
} }
@ -75,8 +77,9 @@ func (s *servlet) Stop() {
close(s.stopChan) close(s.stopChan)
s.stopWg.Wait() s.stopWg.Wait()
s.stopChan = nil s.stopChan = nil
s.sh.Destroy(s.ctx)
s.sh.Destroy() s.messageQueueChan = nil
s.contentType = "" s.contentType = ""
s.reader = nil s.reader = nil
@ -96,6 +99,10 @@ func (s *servlet) Send(method string, args ...interface{}) (err error) {
return nil return nil
} }
func (s *servlet) Context() ServletContext {
return s.ctx
}
func handleServlet(s *servlet) { func handleServlet(s *servlet) {
defer s.stopWg.Done() defer s.stopWg.Done()
@ -103,7 +110,7 @@ func handleServlet(s *servlet) {
go handleMessage(s) go handleMessage(s)
for { for {
requestCodec, err := s.sh.GetRequest(s.serverCodec, s.reader) requestCodec, err := s.sh.GetRequest(s.ctx, s.serverCodec, s.reader)
if nil != err { if nil != err {
continue continue
} }
@ -123,7 +130,7 @@ func handleRequest(s *servlet, requestCodec protocol.ServerRequestCodec) {
s.stopWg.Done() s.stopWg.Done()
}() }()
result, err := s.sh.Invoke(requestCodec) result, err := s.sh.Invoke(s.ctx, requestCodec)
ms := retainMessageState(protocol.MessageTypeResponse) ms := retainMessageState(protocol.MessageTypeResponse)
ms.res.requestCodec = requestCodec ms.res.requestCodec = requestCodec
@ -143,12 +150,12 @@ func handleMessage(s *servlet) {
case ms := <-s.messageQueueChan: case ms := <-s.messageQueueChan:
switch ms.messageType { switch ms.messageType {
case protocol.MessageTypeResponse: case protocol.MessageTypeResponse:
if err := s.sh.SendResponse(ms.res.requestCodec, s.writer, ms.res.result, ms.res.err); nil != err { if err := s.sh.SendResponse(s.ctx, ms.res.requestCodec, s.writer, ms.res.result, ms.res.err); nil != err {
logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err)) logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err))
} }
ms.res.requestCodec.Close() ms.res.requestCodec.Close()
case protocol.MessageTypeNotification: case protocol.MessageTypeNotification:
if err := s.sh.SendNotification(s.serverCodec, s.writer, ms.noti.method, ms.noti.args...); nil != err { if err := s.sh.SendNotification(s.ctx, s.serverCodec, s.writer, ms.noti.method, ms.noti.args...); nil != err {
logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err)) logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err))
} }
default: default:

20
servlet_context.go Normal file
View File

@ -0,0 +1,20 @@
package rpc
import (
cuc "git.loafle.net/commons_go/util/context"
)
type ServletContext interface {
cuc.Context
}
type servletContext struct {
cuc.Context
}
func NewServletContext(parent cuc.Context) ServletContext {
sCTX := &servletContext{}
sCTX.Context = cuc.NewContext(parent)
return sCTX
}

View File

@ -3,14 +3,14 @@ package rpc
import "git.loafle.net/commons_go/rpc/protocol" import "git.loafle.net/commons_go/rpc/protocol"
type ServletHandler interface { type ServletHandler interface {
Init() error Init(servletCTX ServletContext) error
GetRequest(codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error) GetRequest(servletCTX ServletContext, codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error)
Invoke(requestCodec protocol.RegistryCodec) (result interface{}, err error) Invoke(servletCTX ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error)
SendResponse(requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error SendResponse(servletCTX ServletContext, requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error
SendNotification(codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error SendNotification(servletCTX ServletContext, codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error
Destroy() Destroy(servletCTX ServletContext)
RegisterCodec(contentType string, codec protocol.ServerCodec) RegisterCodec(contentType string, codec protocol.ServerCodec)

View File

@ -20,27 +20,27 @@ type ServletHandlers struct {
codecs map[string]protocol.ServerCodec codecs map[string]protocol.ServerCodec
} }
func (sh *ServletHandlers) Init() error { func (sh *ServletHandlers) Init(servletCTX ServletContext) error {
return nil return nil
} }
func (sh *ServletHandlers) GetRequest(codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error) { func (sh *ServletHandlers) GetRequest(servletCTX ServletContext, codec protocol.ServerCodec, reader interface{}) (protocol.ServerRequestCodec, error) {
return nil, fmt.Errorf("Servlet Handler: GetRequest is not implemented") return nil, fmt.Errorf("Servlet Handler: GetRequest is not implemented")
} }
func (sh *ServletHandlers) Invoke(requestCodec protocol.RegistryCodec) (result interface{}, err error) { func (sh *ServletHandlers) Invoke(servletCTX ServletContext, requestCodec protocol.RegistryCodec) (result interface{}, err error) {
return nil, fmt.Errorf("Servlet Handler: Invoke is not implemented") return nil, fmt.Errorf("Servlet Handler: Invoke is not implemented")
} }
func (sh *ServletHandlers) SendResponse(requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error { func (sh *ServletHandlers) SendResponse(servletCTX ServletContext, requestCodec protocol.ServerRequestCodec, writer interface{}, result interface{}, err error) error {
return fmt.Errorf("Servlet Handler: SendResponse is not implemented") return fmt.Errorf("Servlet Handler: SendResponse is not implemented")
} }
func (sh *ServletHandlers) SendNotification(codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error { func (sh *ServletHandlers) SendNotification(servletCTX ServletContext, codec protocol.ServerCodec, writer interface{}, method string, args ...interface{}) error {
return fmt.Errorf("Servlet Handler: SendNotification is not implemented") return fmt.Errorf("Servlet Handler: SendNotification is not implemented")
} }
func (sh *ServletHandlers) Destroy() { func (sh *ServletHandlers) Destroy(servletCTX ServletContext) {
// no op // no op
} }