2017-11-26 10:15:51 +00:00
|
|
|
package rpc
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2017-11-28 09:33:52 +00:00
|
|
|
"io"
|
|
|
|
"runtime"
|
2017-11-26 10:15:51 +00:00
|
|
|
"sync"
|
|
|
|
|
|
|
|
"git.loafle.net/commons_go/logging"
|
|
|
|
"git.loafle.net/commons_go/rpc/protocol"
|
2017-11-27 16:22:02 +00:00
|
|
|
cuc "git.loafle.net/commons_go/util/context"
|
2017-11-26 10:15:51 +00:00
|
|
|
)
|
|
|
|
|
2017-11-29 09:55:24 +00:00
|
|
|
func NewServlet(sh ServletHandler, rwcSH ServletReadWriteCloseHandler) Servlet {
|
2017-11-28 16:19:03 +00:00
|
|
|
return &rpcServlet{
|
2017-11-29 09:55:24 +00:00
|
|
|
sh: sh,
|
|
|
|
rwcSH: rwcSH,
|
2017-11-26 10:15:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-11-28 16:40:03 +00:00
|
|
|
type Servlet interface {
|
2017-11-28 09:51:08 +00:00
|
|
|
Start(parentCTX cuc.Context, conn interface{}, doneChan chan<- error) error
|
2017-11-26 10:15:51 +00:00
|
|
|
Stop()
|
|
|
|
|
|
|
|
Send(method string, args ...interface{}) (err error)
|
2017-11-27 12:04:25 +00:00
|
|
|
|
2017-11-28 16:24:16 +00:00
|
|
|
Context() ServletContext
|
2017-11-26 10:15:51 +00:00
|
|
|
}
|
|
|
|
|
2017-11-28 16:19:03 +00:00
|
|
|
type rpcServlet struct {
|
2017-11-29 09:55:24 +00:00
|
|
|
ctx ServletContext
|
|
|
|
sh ServletHandler
|
|
|
|
rwcSH ServletReadWriteCloseHandler
|
|
|
|
|
2017-11-28 16:19:03 +00:00
|
|
|
responseQueueChan chan *responseState
|
2017-11-26 10:15:51 +00:00
|
|
|
|
2017-11-28 09:33:52 +00:00
|
|
|
conn interface{}
|
2017-11-26 10:15:51 +00:00
|
|
|
serverCodec protocol.ServerCodec
|
|
|
|
|
|
|
|
stopChan chan struct{}
|
|
|
|
stopWg sync.WaitGroup
|
2018-03-22 16:48:16 +00:00
|
|
|
|
|
|
|
sendMtx sync.RWMutex
|
2017-11-26 10:15:51 +00:00
|
|
|
}
|
|
|
|
|
2018-03-09 09:11:03 +00:00
|
|
|
func (s *rpcServlet) Context() ServletContext {
|
|
|
|
return s.ctx
|
|
|
|
}
|
|
|
|
|
2017-11-28 16:19:03 +00:00
|
|
|
func (s *rpcServlet) Start(parentCTX cuc.Context, conn interface{}, doneChan chan<- error) error {
|
2017-11-26 10:15:51 +00:00
|
|
|
if nil == s.sh {
|
2018-03-22 09:08:37 +00:00
|
|
|
return fmt.Errorf("RPC Servlet: servlet handler must be specified")
|
2017-11-26 10:15:51 +00:00
|
|
|
}
|
|
|
|
s.sh.Validate()
|
|
|
|
|
2017-11-29 09:55:24 +00:00
|
|
|
if nil == s.rwcSH {
|
2018-03-22 09:08:37 +00:00
|
|
|
return fmt.Errorf("RPC Servlet: servlet RWC handler must be specified")
|
2017-11-29 09:55:24 +00:00
|
|
|
}
|
|
|
|
s.rwcSH.Validate()
|
|
|
|
|
2017-11-26 10:15:51 +00:00
|
|
|
if s.stopChan != nil {
|
2018-03-22 09:08:37 +00:00
|
|
|
return fmt.Errorf("RPC Servlet: servlet is already running. Stop it before starting it again")
|
2017-11-26 10:15:51 +00:00
|
|
|
}
|
2017-11-29 09:55:24 +00:00
|
|
|
s.ctx = s.sh.ServletContext(parentCTX)
|
2017-11-26 10:15:51 +00:00
|
|
|
|
2017-11-29 09:55:24 +00:00
|
|
|
sc, err := s.sh.getCodec(s.ctx.GetAttribute(ContentTypeKey).(string))
|
2017-11-26 10:15:51 +00:00
|
|
|
if nil != err {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-11-28 09:33:52 +00:00
|
|
|
s.conn = conn
|
2017-11-26 10:15:51 +00:00
|
|
|
s.serverCodec = sc
|
|
|
|
|
2017-11-27 12:04:25 +00:00
|
|
|
if err := s.sh.Init(s.ctx); nil != err {
|
2018-03-22 09:08:37 +00:00
|
|
|
return fmt.Errorf("RPC Servlet: Initialization of servlet has been failed %v", err)
|
2017-11-26 10:15:51 +00:00
|
|
|
}
|
2017-12-01 07:29:32 +00:00
|
|
|
s.responseQueueChan = make(chan *responseState, s.sh.GetPendingResponses())
|
2017-11-26 10:15:51 +00:00
|
|
|
|
|
|
|
s.stopChan = make(chan struct{})
|
|
|
|
|
|
|
|
s.stopWg.Add(1)
|
2018-03-22 09:08:37 +00:00
|
|
|
go handleServlet(s, doneChan)
|
2017-11-26 10:15:51 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-11-28 16:19:03 +00:00
|
|
|
func (s *rpcServlet) Send(method string, args ...interface{}) (err error) {
|
2018-03-22 16:48:16 +00:00
|
|
|
s.sendMtx.RLock()
|
2017-11-28 16:19:03 +00:00
|
|
|
noti := ¬ification{
|
|
|
|
method: method,
|
|
|
|
args: args,
|
|
|
|
}
|
|
|
|
rs := &responseState{
|
|
|
|
noti: noti,
|
|
|
|
}
|
2017-11-26 10:15:51 +00:00
|
|
|
|
2017-11-28 16:19:03 +00:00
|
|
|
s.responseQueueChan <- rs
|
2018-03-22 16:48:16 +00:00
|
|
|
s.sendMtx.RUnlock()
|
2017-11-26 10:15:51 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-03-09 09:11:03 +00:00
|
|
|
func (s *rpcServlet) Stop() {
|
|
|
|
if s.stopChan == nil {
|
2018-03-22 09:08:37 +00:00
|
|
|
logging.Logger().Warnf("RPC Servlet: RPC Servlet must be started before stopping it")
|
2018-03-09 09:11:03 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
close(s.stopChan)
|
2017-11-30 07:15:20 +00:00
|
|
|
s.stopWg.Wait()
|
|
|
|
|
|
|
|
s.sh.Destroy(s.ctx)
|
|
|
|
|
|
|
|
s.stopChan = nil
|
|
|
|
s.responseQueueChan = nil
|
|
|
|
s.conn = nil
|
|
|
|
s.serverCodec = nil
|
|
|
|
|
2018-03-22 09:08:37 +00:00
|
|
|
logging.Logger().Infof("RPC Servlet: RPC Servlet is stopped")
|
2017-11-30 07:15:20 +00:00
|
|
|
}
|
|
|
|
|
2018-03-22 09:08:37 +00:00
|
|
|
func handleServlet(s *rpcServlet, doneChan chan<- error) {
|
2017-11-28 09:51:08 +00:00
|
|
|
var err error
|
|
|
|
|
2018-03-22 09:08:37 +00:00
|
|
|
logging.Logger().Infof("RPC Servlet: RPC Servlet is started")
|
2017-11-30 06:43:19 +00:00
|
|
|
|
2017-11-28 09:51:08 +00:00
|
|
|
defer func() {
|
|
|
|
s.stopWg.Done()
|
2018-03-22 09:08:37 +00:00
|
|
|
doneChan <- err
|
2017-11-28 09:51:08 +00:00
|
|
|
}()
|
2017-11-26 10:15:51 +00:00
|
|
|
|
2017-11-28 09:33:52 +00:00
|
|
|
subStopChan := make(chan struct{})
|
2017-11-27 17:08:53 +00:00
|
|
|
|
2017-11-28 09:33:52 +00:00
|
|
|
readerDone := make(chan error, 1)
|
|
|
|
go handleReader(s, subStopChan, readerDone)
|
|
|
|
|
|
|
|
writerDone := make(chan error, 1)
|
|
|
|
go handleWriter(s, subStopChan, writerDone)
|
|
|
|
|
|
|
|
select {
|
|
|
|
case err = <-readerDone:
|
|
|
|
close(subStopChan)
|
|
|
|
<-writerDone
|
|
|
|
case err = <-writerDone:
|
|
|
|
close(subStopChan)
|
|
|
|
<-readerDone
|
|
|
|
case <-s.stopChan:
|
|
|
|
close(subStopChan)
|
|
|
|
<-readerDone
|
|
|
|
<-writerDone
|
|
|
|
}
|
|
|
|
|
|
|
|
if err != nil {
|
2018-03-22 09:08:37 +00:00
|
|
|
logging.Logger().Errorf("RPC Servlet: servlet error %v", err)
|
2017-11-28 09:33:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-11-28 16:19:03 +00:00
|
|
|
func handleReader(s *rpcServlet, stopChan chan struct{}, doneChan chan error) {
|
2018-03-22 09:08:37 +00:00
|
|
|
logging.Logger().Debugf("RPC Servlet: Reader of Servlet is started")
|
2017-11-30 06:43:19 +00:00
|
|
|
|
2017-11-28 09:33:52 +00:00
|
|
|
var err error
|
|
|
|
defer func() {
|
2018-03-22 09:08:37 +00:00
|
|
|
logging.Logger().Debugf("RPC Servlet: Reader of Servlet is stopped")
|
2017-11-28 09:33:52 +00:00
|
|
|
doneChan <- err
|
|
|
|
}()
|
2017-11-26 10:15:51 +00:00
|
|
|
|
|
|
|
for {
|
2017-11-30 04:59:46 +00:00
|
|
|
if nil == s.conn {
|
2018-03-22 09:08:37 +00:00
|
|
|
err = fmt.Errorf("RPC Servlet: Disconnected from client")
|
2017-11-30 04:59:46 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2017-11-29 09:55:24 +00:00
|
|
|
requestCodec, err := s.rwcSH.ReadRequest(s.ctx, s.serverCodec, s.conn)
|
2017-11-26 10:15:51 +00:00
|
|
|
if nil != err {
|
2017-11-28 09:33:52 +00:00
|
|
|
if err == io.ErrUnexpectedEOF || err == io.EOF {
|
2018-03-22 09:08:37 +00:00
|
|
|
err = fmt.Errorf("RPC Server: Disconnected from client")
|
2017-11-28 09:33:52 +00:00
|
|
|
return
|
|
|
|
}
|
2018-03-22 09:08:37 +00:00
|
|
|
logging.Logger().Errorf("RPC Servlet: Cannot read request: [%s]", err)
|
2017-11-26 10:15:51 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
s.stopWg.Add(1)
|
|
|
|
go handleRequest(s, requestCodec)
|
|
|
|
|
|
|
|
select {
|
2017-11-28 09:33:52 +00:00
|
|
|
case <-stopChan:
|
2018-03-22 09:08:37 +00:00
|
|
|
err = fmt.Errorf("RPC Servlet: Reading request stopped because get stop channel")
|
2017-11-28 09:33:52 +00:00
|
|
|
return
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-11-28 16:19:03 +00:00
|
|
|
func handleWriter(s *rpcServlet, stopChan chan struct{}, doneChan chan error) {
|
2018-03-22 09:08:37 +00:00
|
|
|
logging.Logger().Debugf("RPC Servlet: Writer of Servlet is started")
|
2017-11-30 06:43:19 +00:00
|
|
|
|
2017-11-28 09:33:52 +00:00
|
|
|
var err error
|
|
|
|
defer func() {
|
2018-03-22 09:08:37 +00:00
|
|
|
logging.Logger().Debugf("RPC Servlet: Writer of Servlet is stopped")
|
2017-11-28 09:33:52 +00:00
|
|
|
doneChan <- err
|
|
|
|
}()
|
|
|
|
|
|
|
|
for {
|
2017-11-28 16:19:03 +00:00
|
|
|
var rs *responseState
|
2017-11-28 09:33:52 +00:00
|
|
|
|
|
|
|
select {
|
2017-11-28 16:19:03 +00:00
|
|
|
case rs = <-s.responseQueueChan:
|
2017-11-28 09:33:52 +00:00
|
|
|
default:
|
2017-11-28 16:19:03 +00:00
|
|
|
// Give the last chance for ready goroutines filling s.responseQueueChan :)
|
2017-11-28 09:33:52 +00:00
|
|
|
runtime.Gosched()
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-stopChan:
|
2018-03-22 09:08:37 +00:00
|
|
|
err = fmt.Errorf("RPC Servlet: Writing message stopped because get stop channel")
|
2017-11-28 09:33:52 +00:00
|
|
|
return
|
2017-11-28 16:19:03 +00:00
|
|
|
case rs = <-s.responseQueueChan:
|
2017-11-28 09:33:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-11-30 04:59:46 +00:00
|
|
|
if nil == s.conn {
|
2018-03-22 09:08:37 +00:00
|
|
|
err = fmt.Errorf("RPC Servlet: Disconnected from client")
|
2017-11-30 04:59:46 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2017-11-28 16:19:03 +00:00
|
|
|
if nil != rs.requestCodec {
|
2017-11-29 09:55:24 +00:00
|
|
|
if err := s.rwcSH.WriteResponse(s.ctx, s.conn, rs.requestCodec, rs.result, rs.err); nil != err {
|
2018-03-22 09:08:37 +00:00
|
|
|
logging.Logger().Errorf("RPC Servlet: response error %v", err)
|
2017-11-28 09:33:52 +00:00
|
|
|
}
|
2017-11-28 16:19:03 +00:00
|
|
|
} else {
|
2017-11-29 09:55:24 +00:00
|
|
|
if err := s.rwcSH.WriteNotification(s.ctx, s.conn, s.serverCodec, rs.noti.method, rs.noti.args); nil != err {
|
2018-03-22 09:08:37 +00:00
|
|
|
logging.Logger().Errorf("RPC Servlet: notification error %v", err)
|
2017-11-28 09:33:52 +00:00
|
|
|
}
|
2017-11-26 10:15:51 +00:00
|
|
|
}
|
2017-11-28 09:33:52 +00:00
|
|
|
|
2017-11-26 10:15:51 +00:00
|
|
|
}
|
2017-11-28 09:33:52 +00:00
|
|
|
|
2017-11-26 10:15:51 +00:00
|
|
|
}
|
|
|
|
|
2017-11-28 16:19:03 +00:00
|
|
|
func handleRequest(s *rpcServlet, requestCodec protocol.ServerRequestCodec) {
|
2017-11-26 10:15:51 +00:00
|
|
|
defer func() {
|
|
|
|
s.stopWg.Done()
|
|
|
|
}()
|
|
|
|
|
2017-11-27 12:04:25 +00:00
|
|
|
result, err := s.sh.Invoke(s.ctx, requestCodec)
|
2017-11-26 10:15:51 +00:00
|
|
|
|
2018-03-22 15:41:28 +00:00
|
|
|
if !requestCodec.HasResponse() {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2017-11-28 16:19:03 +00:00
|
|
|
rs := &responseState{
|
|
|
|
requestCodec: requestCodec,
|
|
|
|
result: result,
|
|
|
|
err: err,
|
|
|
|
}
|
2017-11-26 10:15:51 +00:00
|
|
|
|
2017-11-28 16:19:03 +00:00
|
|
|
s.responseQueueChan <- rs
|
2017-11-26 10:15:51 +00:00
|
|
|
}
|
|
|
|
|
2017-11-28 16:19:03 +00:00
|
|
|
type responseState struct {
|
2017-11-26 10:15:51 +00:00
|
|
|
requestCodec protocol.ServerRequestCodec
|
|
|
|
result interface{}
|
2017-11-28 16:19:03 +00:00
|
|
|
noti *notification
|
2017-11-26 10:15:51 +00:00
|
|
|
err error
|
|
|
|
}
|
|
|
|
|
2017-11-28 16:19:03 +00:00
|
|
|
type notification struct {
|
2017-11-26 10:15:51 +00:00
|
|
|
method string
|
2018-03-20 06:31:54 +00:00
|
|
|
args []interface{}
|
2017-11-26 10:15:51 +00:00
|
|
|
}
|