This commit is contained in:
crusader 2018-03-22 18:08:37 +09:00
parent 945c47c19a
commit fccf18a26a

View File

@ -34,7 +34,6 @@ type rpcServlet struct {
responseQueueChan chan *responseState responseQueueChan chan *responseState
doneChan chan<- error
conn interface{} conn interface{}
serverCodec protocol.ServerCodec serverCodec protocol.ServerCodec
@ -48,17 +47,17 @@ func (s *rpcServlet) Context() ServletContext {
func (s *rpcServlet) Start(parentCTX cuc.Context, conn interface{}, doneChan chan<- error) error { func (s *rpcServlet) Start(parentCTX cuc.Context, conn interface{}, doneChan chan<- error) error {
if nil == s.sh { if nil == s.sh {
panic("Servlet: servlet handler must be specified.") return fmt.Errorf("RPC Servlet: servlet handler must be specified")
} }
s.sh.Validate() s.sh.Validate()
if nil == s.rwcSH { if nil == s.rwcSH {
panic("Servlet: servlet RWC handler must be specified.") return fmt.Errorf("RPC Servlet: servlet RWC handler must be specified")
} }
s.rwcSH.Validate() s.rwcSH.Validate()
if s.stopChan != nil { if s.stopChan != nil {
return fmt.Errorf("Servlet: servlet is already running. Stop it before starting it again") return fmt.Errorf("RPC Servlet: servlet is already running. Stop it before starting it again")
} }
s.ctx = s.sh.ServletContext(parentCTX) s.ctx = s.sh.ServletContext(parentCTX)
@ -67,19 +66,18 @@ func (s *rpcServlet) Start(parentCTX cuc.Context, conn interface{}, doneChan cha
return err return err
} }
s.doneChan = doneChan
s.conn = conn s.conn = conn
s.serverCodec = sc s.serverCodec = sc
if err := s.sh.Init(s.ctx); nil != err { if err := s.sh.Init(s.ctx); nil != err {
logging.Logger().Panicf("Servlet: Initialization of servlet has been failed %v", err) return fmt.Errorf("RPC Servlet: Initialization of servlet has been failed %v", err)
} }
s.responseQueueChan = make(chan *responseState, s.sh.GetPendingResponses()) s.responseQueueChan = make(chan *responseState, s.sh.GetPendingResponses())
s.stopChan = make(chan struct{}) s.stopChan = make(chan struct{})
s.stopWg.Add(1) s.stopWg.Add(1)
go handleServlet(s) go handleServlet(s, doneChan)
return nil return nil
} }
@ -100,7 +98,7 @@ func (s *rpcServlet) Send(method string, args ...interface{}) (err error) {
func (s *rpcServlet) Stop() { func (s *rpcServlet) Stop() {
if s.stopChan == nil { if s.stopChan == nil {
logging.Logger().Warnf("Server: server must be started before stopping it") logging.Logger().Warnf("RPC Servlet: RPC Servlet must be started before stopping it")
return return
} }
close(s.stopChan) close(s.stopChan)
@ -113,17 +111,17 @@ func (s *rpcServlet) Stop() {
s.conn = nil s.conn = nil
s.serverCodec = nil s.serverCodec = nil
logging.Logger().Infof("Servlet is stopped") logging.Logger().Infof("RPC Servlet: RPC Servlet is stopped")
} }
func handleServlet(s *rpcServlet) { func handleServlet(s *rpcServlet, doneChan chan<- error) {
var err error var err error
logging.Logger().Infof("Servlet is started") logging.Logger().Infof("RPC Servlet: RPC Servlet is started")
defer func() { defer func() {
s.stopWg.Done() s.stopWg.Done()
s.doneChan <- err doneChan <- err
}() }()
subStopChan := make(chan struct{}) subStopChan := make(chan struct{})
@ -148,38 +146,32 @@ func handleServlet(s *rpcServlet) {
} }
if err != nil { if err != nil {
logging.Logger().Errorf("RPC Server: servlet error %v", err) logging.Logger().Errorf("RPC Servlet: servlet error %v", err)
} }
} }
func handleReader(s *rpcServlet, stopChan chan struct{}, doneChan chan error) { func handleReader(s *rpcServlet, stopChan chan struct{}, doneChan chan error) {
logging.Logger().Debugf("reader of Servlet is started") logging.Logger().Debugf("RPC Servlet: Reader of Servlet is started")
var err error var err error
defer func() { defer func() {
logging.Logger().Debugf("reader of Servlet is stopped") logging.Logger().Debugf("RPC Servlet: Reader of Servlet is stopped")
if r := recover(); r != nil {
if err == nil {
err = fmt.Errorf("RPC Server: Panic when reading request from client: %v", r)
}
}
doneChan <- err doneChan <- err
}() }()
for { for {
if nil == s.conn { if nil == s.conn {
err = fmt.Errorf("RPC Server: disconnected from client") err = fmt.Errorf("RPC Servlet: Disconnected from client")
return return
} }
requestCodec, err := s.rwcSH.ReadRequest(s.ctx, s.serverCodec, s.conn) requestCodec, err := s.rwcSH.ReadRequest(s.ctx, s.serverCodec, s.conn)
if nil != err { if nil != err {
if err == io.ErrUnexpectedEOF || err == io.EOF { if err == io.ErrUnexpectedEOF || err == io.EOF {
err = fmt.Errorf("RPC Server: disconnected from client") err = fmt.Errorf("RPC Server: Disconnected from client")
return return
} }
logging.Logger().Errorf("RPC Server: Cannot read request: [%s]", err) logging.Logger().Errorf("RPC Servlet: Cannot read request: [%s]", err)
continue continue
} }
@ -188,7 +180,7 @@ func handleReader(s *rpcServlet, stopChan chan struct{}, doneChan chan error) {
select { select {
case <-stopChan: case <-stopChan:
err = fmt.Errorf("RPC Server: reading request stopped because get stop channel") err = fmt.Errorf("RPC Servlet: Reading request stopped because get stop channel")
return return
default: default:
} }
@ -196,16 +188,11 @@ func handleReader(s *rpcServlet, stopChan chan struct{}, doneChan chan error) {
} }
func handleWriter(s *rpcServlet, stopChan chan struct{}, doneChan chan error) { func handleWriter(s *rpcServlet, stopChan chan struct{}, doneChan chan error) {
logging.Logger().Debugf("writer of Servlet is started") logging.Logger().Debugf("RPC Servlet: Writer of Servlet is started")
var err error var err error
defer func() { defer func() {
logging.Logger().Debugf("writer of Servlet is stopped") logging.Logger().Debugf("RPC Servlet: Writer of Servlet is stopped")
if r := recover(); r != nil {
if err == nil {
err = fmt.Errorf("RPC Server: Panic when writing response to client: %v", r)
}
}
doneChan <- err doneChan <- err
}() }()
@ -220,24 +207,24 @@ func handleWriter(s *rpcServlet, stopChan chan struct{}, doneChan chan error) {
select { select {
case <-stopChan: case <-stopChan:
err = fmt.Errorf("RPC Server: writing message stopped because get stop channel") err = fmt.Errorf("RPC Servlet: Writing message stopped because get stop channel")
return return
case rs = <-s.responseQueueChan: case rs = <-s.responseQueueChan:
} }
} }
if nil == s.conn { if nil == s.conn {
err = fmt.Errorf("RPC Server: disconnected from client") err = fmt.Errorf("RPC Servlet: Disconnected from client")
return return
} }
if nil != rs.requestCodec { if nil != rs.requestCodec {
if err := s.rwcSH.WriteResponse(s.ctx, s.conn, rs.requestCodec, rs.result, rs.err); nil != err { if err := s.rwcSH.WriteResponse(s.ctx, s.conn, rs.requestCodec, rs.result, rs.err); nil != err {
logging.Logger().Errorf("RPC Server: response error %v", err) logging.Logger().Errorf("RPC Servlet: response error %v", err)
} }
} else { } else {
if err := s.rwcSH.WriteNotification(s.ctx, s.conn, s.serverCodec, rs.noti.method, rs.noti.args); nil != err { if err := s.rwcSH.WriteNotification(s.ctx, s.conn, s.serverCodec, rs.noti.method, rs.noti.args); nil != err {
logging.Logger().Errorf("RPC Server: notification error %v", err) logging.Logger().Errorf("RPC Servlet: notification error %v", err)
} }
} }