This commit is contained in:
crusader 2017-08-24 19:42:40 +09:00
parent 67de428f46
commit 37ce0a3668
8 changed files with 221 additions and 36 deletions

View File

@ -1,7 +1,137 @@
package clients
import (
"encoding/json"
"fmt"
"io"
"log"
"sync"
"time"
"git.loafle.net/overflow/overflow_gateway_websocket/clients/protocol"
"git.loafle.net/overflow/overflow_gateway_websocket/websocket"
"github.com/valyala/fasthttp"
)
type Client interface {
ID() string
Conn() *websocket.Conn
RequestCtx() *fasthttp.RequestCtx
}
type client struct {
id string
o *Options
fastHTTPCtx *fasthttp.RequestCtx
conn *websocket.Conn
messageType int
writeMTX sync.Mutex
}
func New(id string, o *Options, ctx *fasthttp.RequestCtx, conn *websocket.Conn) Client {
c := &client{
id: id,
o: o,
fastHTTPCtx: ctx,
conn: conn,
messageType: websocket.TextMessage,
}
c.run()
return c
}
func (c *client) ID() string {
return c.id
}
func (c *client) Conn() *websocket.Conn {
return c.conn
}
func (c *client) RequestCtx() *fasthttp.RequestCtx {
return c.fastHTTPCtx
}
func (c *client) run() {
hasReadTimeout := c.o.ReadTimeout > 0
c.conn.SetReadLimit(c.o.MaxMessageSize)
defer func() {
c.o.OnDisconnected(c)
}()
for {
if hasReadTimeout {
c.conn.SetReadDeadline(time.Now().Add(c.o.ReadTimeout))
}
// messageType, data, err := c.conn.ReadMessage()
messageType, r, err := c.conn.NextReader()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
//c.fireError(err)
}
break
} else {
c.onMessage(messageType, r)
}
}
}
func (c *client) onMessage(messageType int, r io.Reader) {
var err error
req := &protocol.ServerRequest{}
err = json.NewDecoder(r).Decode(req)
if err != nil {
}
if nil != req.Id {
c.onRequest(req)
} else {
c.onNotify(req)
}
}
func (c *client) onRequest(req *protocol.ServerRequest) {
var err error
result, err := c.o.OnRequest(c, req.Method, req.Params)
res := &protocol.ServerResponse{
Id: req.Id,
Result: result,
Error: err,
}
c.writeMTX.Lock()
if writeTimeout := c.o.WriteTimeout; writeTimeout > 0 {
err := c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
log.Println(fmt.Errorf("%v", err))
return
}
j, err := json.Marshal(res)
if nil != err {
log.Println(fmt.Errorf("%v", err))
}
err = c.conn.WriteMessage(c.messageType, j)
c.writeMTX.Unlock()
if nil != err {
}
}
func (c *client) onNotify(req *protocol.ServerRequest) {
err := c.o.OnNotify(c, req.Method, req.Params)
if nil != err {
log.Println(fmt.Errorf("%v", err))
}
}
func (c *client) sendError() {
}

View File

@ -1,7 +0,0 @@
package clients
type Context interface {
}
type context struct {
}

42
clients/options.go Normal file
View File

@ -0,0 +1,42 @@
package clients
import "time"
const ()
// Options is configuration of the websocket server
type Options struct {
OnRequest func(c Client, method string, params interface{}) (interface{}, error)
OnNotify func(c Client, method string, params interface{}) error
OnDisconnected func(c Client)
MaxMessageSize int64
WriteTimeout time.Duration
ReadTimeout time.Duration
BinaryMessage bool
ReadBufferSize int
WriteBufferSize int
}
// Validate validates the configuration
func (o *Options) Validate() *Options {
if o.OnRequest == nil {
o.OnRequest = func(c Client, method string, params interface{}) (interface{}, error) {
return nil, nil
}
}
if o.OnNotify == nil {
o.OnNotify = func(c Client, method string, params interface{}) error {
return nil
}
}
if o.OnDisconnected == nil {
o.OnDisconnected = func(c Client) {
}
}
return o
}

View File

@ -3,19 +3,20 @@ package overflow_gateway_websocket
import (
"time"
"git.loafle.net/overflow/overflow_gateway_websocket/websocket"
"git.loafle.net/overflow/overflow_gateway_websocket/clients"
uuid "github.com/satori/go.uuid"
"github.com/valyala/fasthttp"
)
type (
OnConnectionFunc func(ctx *fasthttp.RequestCtx, conn *websocket.Conn)
OnRequestFunc func(method string, params interface{}) (interface{}, error)
OnNotifyFunc func(method string, params interface{})
OnPushFunc func()
OnErrorFunc func(ctx *fasthttp.RequestCtx, status int, reason error)
OnCheckOriginFunc func(ctx *fasthttp.RequestCtx) bool
IDGeneratorFunc func(ctx *fasthttp.RequestCtx) string
OnConnectionFunc func(c clients.Client)
OnDisconnectedFunc func(c clients.Client)
OnRequestFunc func(c clients.Client, method string, params interface{}) (interface{}, error)
OnNotifyFunc func(c clients.Client, method string, params interface{}) error
OnPushFunc func()
OnErrorFunc func(ctx *fasthttp.RequestCtx, status int, reason error)
OnCheckOriginFunc func(ctx *fasthttp.RequestCtx) bool
IDGeneratorFunc func(ctx *fasthttp.RequestCtx) string
)
const (
@ -47,6 +48,7 @@ var (
// Options is configuration of the websocket server
type Options struct {
OnConnection OnConnectionFunc
OnDisconnected OnDisconnectedFunc
OnRequest OnRequestFunc
OnNotify OnNotifyFunc
OnPush OnPushFunc
@ -97,18 +99,24 @@ func (o *Options) Validate() *Options {
}
if o.OnConnection == nil {
o.OnConnection = func(ctx *fasthttp.RequestCtx, conn *websocket.Conn) {
o.OnConnection = func(c clients.Client) {
}
}
if o.OnDisconnected == nil {
o.OnDisconnected = func(c clients.Client) {
}
}
if o.OnRequest == nil {
o.OnRequest = func(method string, params interface{}) (interface{}, error) {
o.OnRequest = func(c clients.Client, method string, params interface{}) (interface{}, error) {
return nil, nil
}
}
if o.OnNotify == nil {
o.OnNotify = func(method string, params interface{}) {
o.OnNotify = func(c clients.Client, method string, params interface{}) error {
return nil
}
}

View File

@ -3,6 +3,7 @@ package overflow_gateway_websocket
import (
"log"
"git.loafle.net/overflow/overflow_gateway_websocket/clients"
"git.loafle.net/overflow/overflow_gateway_websocket/websocket"
"github.com/valyala/fasthttp"
)
@ -14,28 +15,36 @@ type Server interface {
}
type server struct {
o *Options
upgrader *websocket.Upgrader
_option *Options
_clientPption *clients.Options
_upgrader *websocket.Upgrader
_clients map[string]clients.Client
}
func NewServer(o *Options) Server {
s := &server{
o: o.Validate(),
_option: o.Validate(),
_clients: make(map[string]clients.Client, 100),
}
s.upgrader = &websocket.Upgrader{
ReadBufferSize: s.o.ReadBufferSize,
WriteBufferSize: s.o.WriteBufferSize,
HandshakeTimeout: s.o.HandshakeTimeout,
s._upgrader = &websocket.Upgrader{
ReadBufferSize: s._option.ReadBufferSize,
WriteBufferSize: s._option.WriteBufferSize,
HandshakeTimeout: s._option.HandshakeTimeout,
}
s._clientPption = &clients.Options{
OnRequest: s._option.OnRequest,
OnDisconnected: s.onDisconnected,
ReadTimeout: s._option.ReadTimeout,
WriteTimeout: s._option.WriteTimeout,
ReadBufferSize: s._option.ReadBufferSize,
WriteBufferSize: s._option.WriteBufferSize,
MaxMessageSize: s._option.MaxMessageSize,
}
return s
}
func (s *server) onRequest(cb OnRequestFunc) {
}
func (s *server) onNotify(cb OnNotifyFunc) {
}
@ -44,22 +53,25 @@ func (s *server) onPush(cb OnPushFunc) {
}
func (s *server) requestHandler(ctx *fasthttp.RequestCtx, c *websocket.Conn) {
func (s *server) onDisconnected(c clients.Client) {
delete(s._clients, c.ID())
s._option.OnDisconnected(c)
}
func (s *server) connectionHandler(ctx *fasthttp.RequestCtx) {
s.upgrader.Upgrade(ctx, nil, func(conn *websocket.Conn, err error) {
func (s *server) onConnection(ctx *fasthttp.RequestCtx) {
s._upgrader.Upgrade(ctx, nil, func(conn *websocket.Conn, err error) {
if err != nil {
log.Print("upgrade:", err)
return
}
s.o.OnConnection(ctx, conn)
s.requestHandler(ctx, conn)
cid := s._option.IDGenerator(ctx)
c := clients.New(cid, s._clientPption, ctx, conn)
s._clients[cid] = c
s._option.OnConnection(c)
})
}
func (s *server) ListenAndServe(addr string) error {
return fasthttp.ListenAndServe(addr, s.connectionHandler)
return fasthttp.ListenAndServe(addr, s.onConnection)
}