rpc/servlet.go

257 lines
5.2 KiB
Go
Raw Normal View History

2017-11-26 10:15:51 +00:00
package rpc
import (
"fmt"
2017-11-28 09:33:52 +00:00
"io"
"runtime"
2017-11-26 10:15:51 +00:00
"sync"
"git.loafle.net/commons_go/logging"
"git.loafle.net/commons_go/rpc/protocol"
2017-11-27 16:22:02 +00:00
cuc "git.loafle.net/commons_go/util/context"
2017-11-26 10:15:51 +00:00
)
2017-11-29 09:55:24 +00:00
func NewServlet(sh ServletHandler, rwcSH ServletReadWriteCloseHandler) Servlet {
2017-11-28 16:19:03 +00:00
return &rpcServlet{
2017-11-29 09:55:24 +00:00
sh: sh,
rwcSH: rwcSH,
2017-11-26 10:15:51 +00:00
}
}
2017-11-28 16:40:03 +00:00
type Servlet interface {
2017-11-28 09:51:08 +00:00
Start(parentCTX cuc.Context, conn interface{}, doneChan chan<- error) error
2017-11-26 10:15:51 +00:00
Stop()
Send(method string, args ...interface{}) (err error)
2017-11-27 12:04:25 +00:00
2017-11-28 16:24:16 +00:00
Context() ServletContext
2017-11-26 10:15:51 +00:00
}
2017-11-28 16:19:03 +00:00
type rpcServlet struct {
2017-11-29 09:55:24 +00:00
ctx ServletContext
sh ServletHandler
rwcSH ServletReadWriteCloseHandler
2017-11-28 16:19:03 +00:00
responseQueueChan chan *responseState
2017-11-26 10:15:51 +00:00
2017-11-28 09:51:08 +00:00
doneChan chan<- error
2017-11-28 09:33:52 +00:00
conn interface{}
2017-11-26 10:15:51 +00:00
serverCodec protocol.ServerCodec
stopChan chan struct{}
stopWg sync.WaitGroup
}
2017-11-28 16:19:03 +00:00
func (s *rpcServlet) Start(parentCTX cuc.Context, conn interface{}, doneChan chan<- error) error {
2017-11-26 10:15:51 +00:00
if nil == s.sh {
panic("Servlet: servlet handler must be specified.")
}
s.sh.Validate()
2017-11-29 09:55:24 +00:00
if nil == s.rwcSH {
panic("Servlet: servlet RWC handler must be specified.")
}
s.rwcSH.Validate()
2017-11-26 10:15:51 +00:00
if s.stopChan != nil {
2017-11-27 12:04:25 +00:00
return fmt.Errorf("Servlet: servlet is already running. Stop it before starting it again")
2017-11-26 10:15:51 +00:00
}
2017-11-29 09:55:24 +00:00
s.ctx = s.sh.ServletContext(parentCTX)
2017-11-26 10:15:51 +00:00
2017-11-29 09:55:24 +00:00
sc, err := s.sh.getCodec(s.ctx.GetAttribute(ContentTypeKey).(string))
2017-11-26 10:15:51 +00:00
if nil != err {
return err
}
2017-11-28 09:51:08 +00:00
s.doneChan = doneChan
2017-11-28 09:33:52 +00:00
s.conn = conn
2017-11-26 10:15:51 +00:00
s.serverCodec = sc
2017-11-27 12:04:25 +00:00
if err := s.sh.Init(s.ctx); nil != err {
2017-11-26 10:15:51 +00:00
logging.Logger().Panic(fmt.Sprintf("Servlet: Initialization of servlet has been failed %v", err))
}
s.stopChan = make(chan struct{})
2017-11-28 16:19:03 +00:00
s.responseQueueChan = make(chan *responseState, s.sh.GetPendingResponses())
2017-11-26 10:15:51 +00:00
s.stopWg.Add(1)
2017-11-28 09:33:52 +00:00
go handleServlet(s)
2017-11-26 10:15:51 +00:00
return nil
}
2017-11-28 16:19:03 +00:00
func (s *rpcServlet) Stop() {
2017-11-26 10:15:51 +00:00
if s.stopChan == nil {
panic("Server: server must be started before stopping it")
}
2017-11-29 09:55:24 +00:00
s.sh.Destroy(s.ctx)
2017-11-26 10:15:51 +00:00
close(s.stopChan)
s.stopWg.Wait()
s.stopChan = nil
2017-11-28 16:19:03 +00:00
s.responseQueueChan = nil
2017-11-26 10:15:51 +00:00
2017-11-28 09:33:52 +00:00
s.conn = nil
2017-11-26 10:15:51 +00:00
s.serverCodec = nil
logging.Logger().Info(fmt.Sprintf("Servlet is stopped"))
}
2017-11-28 16:19:03 +00:00
func (s *rpcServlet) Send(method string, args ...interface{}) (err error) {
noti := &notification{
method: method,
args: args,
}
rs := &responseState{
noti: noti,
}
2017-11-26 10:15:51 +00:00
2017-11-28 16:19:03 +00:00
s.responseQueueChan <- rs
2017-11-26 10:15:51 +00:00
return nil
}
2017-11-28 16:24:16 +00:00
func (s *rpcServlet) Context() ServletContext {
2017-11-27 12:04:25 +00:00
return s.ctx
}
2017-11-28 16:19:03 +00:00
func handleServlet(s *rpcServlet) {
2017-11-28 09:51:08 +00:00
var err error
defer func() {
s.doneChan <- err
s.stopWg.Done()
}()
2017-11-26 10:15:51 +00:00
2017-11-28 09:33:52 +00:00
subStopChan := make(chan struct{})
2017-11-27 17:08:53 +00:00
2017-11-28 09:33:52 +00:00
readerDone := make(chan error, 1)
go handleReader(s, subStopChan, readerDone)
writerDone := make(chan error, 1)
go handleWriter(s, subStopChan, writerDone)
select {
case err = <-readerDone:
close(subStopChan)
<-writerDone
case err = <-writerDone:
close(subStopChan)
<-readerDone
case <-s.stopChan:
close(subStopChan)
<-readerDone
<-writerDone
}
if err != nil {
logging.Logger().Error(fmt.Sprintf("RPC Server: servlet error %v", err))
}
}
2017-11-28 16:19:03 +00:00
func handleReader(s *rpcServlet, stopChan chan struct{}, doneChan chan error) {
2017-11-28 09:33:52 +00:00
var err error
defer func() {
if r := recover(); r != nil {
if err == nil {
err = fmt.Errorf("RPC Server: Panic when reading request from client: %v", r)
}
}
doneChan <- err
}()
2017-11-26 10:15:51 +00:00
for {
2017-11-29 09:55:24 +00:00
requestCodec, err := s.rwcSH.ReadRequest(s.ctx, s.serverCodec, s.conn)
2017-11-26 10:15:51 +00:00
if nil != err {
2017-11-28 09:33:52 +00:00
if err == io.ErrUnexpectedEOF || err == io.EOF {
err = fmt.Errorf("RPC Server: disconnected from client")
return
}
logging.Logger().Error(fmt.Sprintf("RPC Server: Cannot read request: [%s]", err))
2017-11-26 10:15:51 +00:00
continue
}
s.stopWg.Add(1)
go handleRequest(s, requestCodec)
select {
2017-11-28 09:33:52 +00:00
case <-stopChan:
err = fmt.Errorf("RPC Server: reading request stopped because get stop channel")
return
default:
}
}
}
2017-11-28 16:19:03 +00:00
func handleWriter(s *rpcServlet, stopChan chan struct{}, doneChan chan error) {
2017-11-28 09:33:52 +00:00
var err error
defer func() {
if r := recover(); r != nil {
if err == nil {
2017-11-28 16:19:03 +00:00
err = fmt.Errorf("RPC Server: Panic when writing response to client: %v", r)
2017-11-28 09:33:52 +00:00
}
}
doneChan <- err
}()
for {
2017-11-28 16:19:03 +00:00
var rs *responseState
2017-11-28 09:33:52 +00:00
select {
2017-11-28 16:19:03 +00:00
case rs = <-s.responseQueueChan:
2017-11-28 09:33:52 +00:00
default:
2017-11-28 16:19:03 +00:00
// Give the last chance for ready goroutines filling s.responseQueueChan :)
2017-11-28 09:33:52 +00:00
runtime.Gosched()
select {
case <-stopChan:
err = fmt.Errorf("RPC Server: writing message stopped because get stop channel")
return
2017-11-28 16:19:03 +00:00
case rs = <-s.responseQueueChan:
2017-11-28 09:33:52 +00:00
}
}
2017-11-28 16:19:03 +00:00
if nil != rs.requestCodec {
2017-11-29 09:55:24 +00:00
if err := s.rwcSH.WriteResponse(s.ctx, s.conn, rs.requestCodec, rs.result, rs.err); nil != err {
2017-11-28 16:19:03 +00:00
logging.Logger().Error(fmt.Sprintf("RPC Server: response error %v", err))
2017-11-28 09:33:52 +00:00
}
2017-11-28 16:19:03 +00:00
} else {
2017-11-29 09:55:24 +00:00
if err := s.rwcSH.WriteNotification(s.ctx, s.conn, s.serverCodec, rs.noti.method, rs.noti.args); nil != err {
2017-11-28 16:19:03 +00:00
logging.Logger().Error(fmt.Sprintf("RPC Server: notification error %v", err))
2017-11-28 09:33:52 +00:00
}
2017-11-26 10:15:51 +00:00
}
2017-11-28 09:33:52 +00:00
2017-11-26 10:15:51 +00:00
}
2017-11-28 09:33:52 +00:00
2017-11-26 10:15:51 +00:00
}
2017-11-28 16:19:03 +00:00
func handleRequest(s *rpcServlet, requestCodec protocol.ServerRequestCodec) {
2017-11-26 10:15:51 +00:00
defer func() {
s.stopWg.Done()
}()
2017-11-27 12:04:25 +00:00
result, err := s.sh.Invoke(s.ctx, requestCodec)
2017-11-26 10:15:51 +00:00
2017-11-28 16:19:03 +00:00
rs := &responseState{
requestCodec: requestCodec,
result: result,
err: err,
}
2017-11-26 10:15:51 +00:00
2017-11-28 16:19:03 +00:00
s.responseQueueChan <- rs
2017-11-26 10:15:51 +00:00
}
2017-11-28 16:19:03 +00:00
type responseState struct {
2017-11-26 10:15:51 +00:00
requestCodec protocol.ServerRequestCodec
result interface{}
2017-11-28 16:19:03 +00:00
noti *notification
2017-11-26 10:15:51 +00:00
err error
}
2017-11-28 16:19:03 +00:00
type notification struct {
2017-11-26 10:15:51 +00:00
method string
2017-11-29 09:55:24 +00:00
args interface{}
2017-11-26 10:15:51 +00:00
}