rpc/servlet.go

217 lines
4.4 KiB
Go
Raw Normal View History

2017-11-26 10:15:51 +00:00
package rpc
import (
"fmt"
"sync"
"git.loafle.net/commons_go/logging"
"git.loafle.net/commons_go/rpc/protocol"
2017-11-27 16:22:02 +00:00
cuc "git.loafle.net/commons_go/util/context"
2017-11-26 10:15:51 +00:00
)
2017-11-27 16:22:02 +00:00
func NewServlet(sh ServletHandler) Servlet {
2017-11-26 10:15:51 +00:00
return &servlet{
sh: sh,
}
}
type Servlet interface {
2017-11-27 16:22:02 +00:00
Start(parentCTX cuc.Context, reader interface{}, writer interface{}) error
2017-11-26 10:15:51 +00:00
Stop()
Send(method string, args ...interface{}) (err error)
2017-11-27 12:04:25 +00:00
Context() ServletContext
2017-11-26 10:15:51 +00:00
}
type servlet struct {
2017-11-27 12:04:25 +00:00
ctx ServletContext
sh ServletHandler
messageQueueChan chan *messageState
2017-11-26 10:15:51 +00:00
reader interface{}
writer interface{}
serverCodec protocol.ServerCodec
stopChan chan struct{}
stopWg sync.WaitGroup
}
2017-11-27 17:08:53 +00:00
func (s *servlet) Start(parentCTX cuc.Context, reader interface{}, writer interface{}, doneChan chan<- struct{}) error {
2017-11-26 10:15:51 +00:00
if nil == s.sh {
panic("Servlet: servlet handler must be specified.")
}
s.sh.Validate()
if s.stopChan != nil {
2017-11-27 12:04:25 +00:00
return fmt.Errorf("Servlet: servlet is already running. Stop it before starting it again")
2017-11-26 10:15:51 +00:00
}
2017-11-27 16:22:02 +00:00
servletCTX := s.sh.ServletContext(parentCTX)
2017-11-26 10:15:51 +00:00
2017-11-27 16:22:02 +00:00
sc, err := s.sh.getCodec(servletCTX.GetAttribute(ContentTypeKey).(string))
2017-11-26 10:15:51 +00:00
if nil != err {
return err
}
s.reader = reader
s.writer = writer
s.serverCodec = sc
2017-11-27 12:04:25 +00:00
if err := s.sh.Init(s.ctx); nil != err {
2017-11-26 10:15:51 +00:00
logging.Logger().Panic(fmt.Sprintf("Servlet: Initialization of servlet has been failed %v", err))
}
s.stopChan = make(chan struct{})
s.messageQueueChan = make(chan *messageState, s.sh.GetPendingMessages())
s.stopWg.Add(1)
2017-11-27 17:08:53 +00:00
go handleServlet(s, doneChan)
2017-11-26 10:15:51 +00:00
return nil
}
func (s *servlet) Stop() {
if s.stopChan == nil {
panic("Server: server must be started before stopping it")
}
close(s.stopChan)
s.stopWg.Wait()
s.stopChan = nil
2017-11-27 12:04:25 +00:00
s.sh.Destroy(s.ctx)
2017-11-26 10:15:51 +00:00
2017-11-27 12:04:25 +00:00
s.messageQueueChan = nil
2017-11-26 10:15:51 +00:00
s.reader = nil
s.writer = nil
s.serverCodec = nil
logging.Logger().Info(fmt.Sprintf("Servlet is stopped"))
}
func (s *servlet) Send(method string, args ...interface{}) (err error) {
ms := retainMessageState(protocol.MessageTypeNotification)
ms.noti.method = method
ms.noti.args = args
s.messageQueueChan <- ms
return nil
}
2017-11-27 12:04:25 +00:00
func (s *servlet) Context() ServletContext {
return s.ctx
}
2017-11-27 17:08:53 +00:00
func handleServlet(s *servlet, doneChan chan<- struct{}) {
2017-11-26 10:15:51 +00:00
defer s.stopWg.Done()
2017-11-27 17:08:53 +00:00
messageStopChan := make(chan struct{})
messageDoneChan := make(chan struct{})
go handleMessage(s, messageStopChan, messageDoneChan)
2017-11-26 10:15:51 +00:00
for {
2017-11-27 12:04:25 +00:00
requestCodec, err := s.sh.GetRequest(s.ctx, s.serverCodec, s.reader)
2017-11-26 10:15:51 +00:00
if nil != err {
continue
}
s.stopWg.Add(1)
go handleRequest(s, requestCodec)
select {
case <-s.stopChan:
default:
}
}
}
func handleRequest(s *servlet, requestCodec protocol.ServerRequestCodec) {
defer func() {
s.stopWg.Done()
}()
2017-11-27 12:04:25 +00:00
result, err := s.sh.Invoke(s.ctx, requestCodec)
2017-11-26 10:15:51 +00:00
ms := retainMessageState(protocol.MessageTypeResponse)
ms.res.requestCodec = requestCodec
ms.res.result = result
ms.res.err = err
s.messageQueueChan <- ms
}
func handleMessage(s *servlet) {
defer func() {
s.stopWg.Done()
}()
for {
select {
case ms := <-s.messageQueueChan:
switch ms.messageType {
case protocol.MessageTypeResponse:
2017-11-27 12:04:25 +00:00
if err := s.sh.SendResponse(s.ctx, ms.res.requestCodec, s.writer, ms.res.result, ms.res.err); nil != err {
2017-11-26 10:15:51 +00:00
logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err))
}
ms.res.requestCodec.Close()
case protocol.MessageTypeNotification:
2017-11-27 12:04:25 +00:00
if err := s.sh.SendNotification(s.ctx, s.serverCodec, s.writer, ms.noti.method, ms.noti.args...); nil != err {
2017-11-26 10:15:51 +00:00
logging.Logger().Error(fmt.Sprintf("RPC Server: response message error %v", err))
}
default:
}
releaseMessageState(ms)
case <-s.stopChan:
return
}
}
}
type messageState struct {
messageType protocol.MessageType
res messageResponse
noti messageNotification
}
type messageResponse struct {
requestCodec protocol.ServerRequestCodec
result interface{}
err error
}
type messageNotification struct {
method string
args []interface{}
}
var messageStatePool sync.Pool
func retainMessageState(messageType protocol.MessageType) *messageState {
var ms *messageState
v := messageStatePool.Get()
if v == nil {
ms = &messageState{}
} else {
ms = v.(*messageState)
}
ms.messageType = messageType
return ms
}
func releaseMessageState(ms *messageState) {
ms.messageType = protocol.MessageTypeUnknown
ms.res.requestCodec = nil
ms.res.result = nil
ms.res.err = nil
ms.noti.method = ""
ms.noti.args = nil
messageStatePool.Put(ms)
}