rpc/adapter/websocket/fasthttp/websocket_fasthttp_socket_handlers.go

83 lines
1.7 KiB
Go
Raw Normal View History

2017-11-08 06:35:27 +00:00
package fasthttp
import (
"io"
"log"
"github.com/valyala/fasthttp"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
"git.loafle.net/commons_go/websocket_fasthttp/websocket"
rpcServer "git.loafle.net/commons_go/rpc/server"
"git.loafle.net/commons_go/server"
)
type SocketHandlers struct {
cwf.SocketHandlers
RPCServerHandler rpcServer.ServerHandler
}
func (sh *SocketHandlers) Handshake(ctx *fasthttp.RequestCtx) (bool, *fasthttp.ResponseHeader) {
return true, nil
}
func (sh *SocketHandlers) Handle(conn *websocket.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) {
contentType := "json"
codec, err := sh.RPCServerHandler.GetCodec(contentType)
if nil != err {
log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{}
return
}
var messageType int
var r io.Reader
var w io.WriteCloser
// conn.SetReadLimit(maxMessageSize)
// conn.SetReadDeadline(time.Now().Add(pongWait))
// conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
if messageType, r, err = conn.NextReader(); nil != err {
doneChan <- struct{}{}
return
}
if w, err = conn.NextWriter(messageType); nil != err {
doneChan <- struct{}{}
return
}
if err = rpcServer.Handle(sh.RPCServerHandler, codec, r, w); nil != err {
if server.IsClientDisconnect(err) {
doneChan <- struct{}{}
return
}
log.Printf("RPC: %v", err)
}
if err = w.Close(); nil != err {
doneChan <- struct{}{}
return
}
select {
case <-stopChan:
return
default:
}
}
}
func (sh *SocketHandlers) Validate() {
sh.SocketHandlers.Validate()
if nil == sh.RPCServerHandler {
panic("RPCServerHandler must be specified.")
}
}