package server import ( olog "git.loafle.net/overflow/log-go" orp "git.loafle.net/overflow/rpc-go/protocol" orr "git.loafle.net/overflow/rpc-go/registry" "git.loafle.net/overflow/server-go" oss "git.loafle.net/overflow/server-go/socket" ossw "git.loafle.net/overflow/server-go/socket/web" "github.com/valyala/fasthttp" ) type ScannerServlet interface { ossw.Servlet } type ScannerServlets struct { ossw.Servlets RPCInvoker orr.RPCInvoker RPCServerCodec orp.ServerCodec } func (s *ScannerServlets) Init(serverCtx server.ServerCtx) error { if err := s.Servlets.Init(serverCtx); nil != err { return err } return nil } func (s *ScannerServlets) OnStart(serverCtx server.ServerCtx) error { if err := s.Servlets.OnStart(serverCtx); nil != err { return err } return nil } func (s *ScannerServlets) OnStop(serverCtx server.ServerCtx) { s.Servlets.OnStop(serverCtx) } func (s *ScannerServlets) Destroy(serverCtx server.ServerCtx) { s.Servlets.Destroy(serverCtx) } func (s *ScannerServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.RequestCtx) (*fasthttp.ResponseHeader, error) { return nil, nil } func (s *ScannerServlets) OnConnect(servletCtx server.ServletCtx, conn oss.Conn) { s.Servlets.OnConnect(servletCtx, conn) } func (s *ScannerServlets) OnDisconnect(servletCtx server.ServletCtx) { s.Servlets.OnDisconnect(servletCtx) } func (s *ScannerServlets) Handle(servletCtx server.ServletCtx, stopChan <-chan struct{}, doneChan chan<- struct{}, readChan <-chan oss.SocketMessage, writeChan chan<- oss.SocketMessage) { var ( src orp.ServerRequestCodec messageType int message []byte result interface{} resMessageType int resMessage []byte err error ) for { select { case socketMessage, ok := <-readChan: if !ok { return } messageType, message = socketMessage() // grpc exec method call src, err = s.RPCServerCodec.NewRequest(messageType, message) if nil != err { olog.Logger().Error(err.Error()) break } if !s.RPCInvoker.HasMethod(src.Method()) { olog.Logger().Error(err.Error()) s.writeError(src, writeChan, orp.E_NO_METHOD, "", err) break } result, err = s.RPCInvoker.Invoke(src) if nil != err { olog.Logger().Error(err.Error()) } if !src.HasResponse() { break } resMessageType, resMessage, err = src.NewResponse(result, err) if nil != err { olog.Logger().Error(err.Error()) s.writeError(src, writeChan, orp.E_INTERNAL, "", err) break } writeChan <- oss.MakeSocketMessage(resMessageType, resMessage) case <-stopChan: return } } } func (s *ScannerServlets) writeError(src orp.ServerRequestCodec, writeChan chan<- oss.SocketMessage, code orp.ErrorCode, message string, data interface{}) { if !src.HasResponse() { return } pErr := &orp.Error{ Code: code, Message: message, Data: data, } resMessageType, resMessage, err := src.NewResponse(nil, pErr) if nil != err { olog.Logger().Error(err.Error()) return } writeChan <- oss.MakeSocketMessage(resMessageType, resMessage) }