This commit is contained in:
crusader 2017-08-25 14:40:04 +09:00
parent 13e42d4489
commit 4a99cee3bc
9 changed files with 135 additions and 54 deletions

View File

@ -1,15 +1,12 @@
package overflow_gateway_websocket
import (
"encoding/json"
"fmt"
"io"
"log"
"sync"
"time"
"git.loafle.net/overflow/overflow_gateway_websocket/protocol"
"git.loafle.net/overflow/overflow_gateway_websocket/websocket"
)
@ -58,7 +55,7 @@ func (c *client) run() {
hasReadTimeout := c.o.ReadTimeout > 0
c.conn.SetReadLimit(c.o.MaxMessageSize)
defer func() {
c.o.OnDisconnected(c)
c.o.onDisconnected(c)
}()
for {
@ -81,27 +78,9 @@ func (c *client) run() {
}
func (c *client) onMessage(messageType int, r io.Reader) {
var err error
req := &protocol.ServerRequest{}
err = json.NewDecoder(r).Decode(req)
if err != nil {
}
if nil != req.Id {
c.onRequest(req)
} else {
c.onNotify(req)
}
}
func (c *client) onRequest(req *protocol.ServerRequest) {
var err error
result, err := c.o.OnRequest(c, req.Method, req.Params)
res := &protocol.ServerResponse{
Id: req.Id,
Result: result,
Error: err,
result := c.o.Handler.OnMessage(c, messageType, r)
if nil == result {
return
}
c.writeMTX.Lock()
@ -111,26 +90,10 @@ func (c *client) onRequest(req *protocol.ServerRequest) {
return
}
j, err := json.Marshal(res)
if nil != err {
log.Println(fmt.Errorf("%v", err))
}
err = c.conn.WriteMessage(c.messageType, j)
err := c.conn.WriteMessage(c.messageType, result)
c.writeMTX.Unlock()
if nil != err {
}
}
func (c *client) onNotify(req *protocol.ServerRequest) {
err := c.o.OnNotify(c, req.Method, req.Params)
if nil != err {
log.Println(fmt.Errorf("%v", err))
}
}
func (c *client) sendError() {
}

View File

@ -1,15 +1,14 @@
package overflow_gateway_websocket
import (
"log"
"time"
"github.com/valyala/fasthttp"
)
type (
OnRequestFunc func(c Client, method string, params interface{}) (interface{}, error)
OnNotifyFunc func(c Client, method string, params interface{}) error
OnErrorFunc func(ctx *fasthttp.RequestCtx, status int, reason error)
OnErrorFunc func(ctx *fasthttp.RequestCtx, status int, reason error)
)
const (
@ -29,9 +28,9 @@ const (
// ClientOptions is configuration of the websocket server
type ClientOptions struct {
OnRequest OnRequestFunc
OnNotify OnNotifyFunc
OnDisconnected OnDisconnectedFunc
onDisconnected OnDisconnectedFunc
Handler MessageHandler
MaxMessageSize int64
WriteTimeout time.Duration
@ -44,6 +43,11 @@ type ClientOptions struct {
// Validate validates the configuration
func (o *ClientOptions) Validate() *ClientOptions {
if nil == o.Handler {
log.Fatalf("Message Handler must specified.\n")
return nil
}
if o.WriteTimeout < 0 {
o.WriteTimeout = DefaultWriteTimeout
}
@ -76,8 +80,8 @@ func (o *ClientOptions) Validate() *ClientOptions {
}
}
if o.OnDisconnected == nil {
o.OnDisconnected = func(c Client) {
if o.onDisconnected == nil {
o.onDisconnected = func(c Client) {
}
}

7
hadler.go Normal file
View File

@ -0,0 +1,7 @@
package overflow_gateway_websocket
import "io"
type MessageHandler interface {
OnMessage(c Client, messageType int, r io.Reader) []byte
}

View File

@ -0,0 +1,69 @@
package jsonrpc
import (
"encoding/json"
"fmt"
"io"
"log"
gws "git.loafle.net/overflow/overflow_gateway_websocket"
)
type MessageHandler interface {
gws.MessageHandler
}
type messageHandler struct {
o *Options
}
func NewHandler(o *Options) MessageHandler {
h := &messageHandler{
o: o,
}
return h
}
func (h *messageHandler) OnMessage(c gws.Client, messageType int, r io.Reader) []byte {
var err error
req := &ServerRequest{}
err = json.NewDecoder(r).Decode(req)
if err != nil {
}
var result []byte
if nil != req.Id {
result = h.onRequest(c, req)
} else {
h.onNotify(c, req)
}
return result
}
func (h *messageHandler) onRequest(c gws.Client, req *ServerRequest) []byte {
var err error
result, err := h.o.OnRequest(c, req.Method, req.Params)
res := &ServerResponse{
Id: req.Id,
Result: result,
Error: err,
}
j, err := json.Marshal(res)
if nil != err {
log.Println(fmt.Errorf("%v", err))
}
return j
}
func (h *messageHandler) onNotify(c gws.Client, req *ServerRequest) {
err := h.o.OnNotify(c, req.Method, req.Params)
if nil != err {
log.Println(fmt.Errorf("%v", err))
}
}

View File

@ -0,0 +1,33 @@
package jsonrpc
import (
gws "git.loafle.net/overflow/overflow_gateway_websocket"
)
type (
OnRequestFunc func(c gws.Client, method string, params interface{}) (interface{}, error)
OnNotifyFunc func(c gws.Client, method string, params interface{}) error
)
// ClientOptions is configuration of the websocket server
type Options struct {
OnRequest OnRequestFunc
OnNotify OnNotifyFunc
}
// Validate validates the configuration
func (o *Options) Validate() *Options {
if o.OnRequest == nil {
o.OnRequest = func(c gws.Client, method string, params interface{}) (interface{}, error) {
return nil, nil
}
}
if o.OnNotify == nil {
o.OnNotify = func(c gws.Client, method string, params interface{}) error {
return nil
}
}
return o
}

View File

@ -1,4 +1,4 @@
package protocol
package jsonrpc
type ServerErrorCode int

View File

@ -1,4 +1,4 @@
package protocol
package jsonrpc
import (
"encoding/json"

View File

@ -1,4 +1,4 @@
package protocol
package jsonrpc
import (
"encoding/json"

View File

@ -36,7 +36,7 @@ func NewServer(o *ServerOptions) Server {
ReadBufferSize: s._option.ReadBufferSize,
WriteBufferSize: s._option.WriteBufferSize,
CheckOrigin: s._option.OnCheckOrigin,
Error: s._option.OnError,
Error: s.onError,
EnableCompression: s._option.EnableCompression,
}
@ -47,6 +47,10 @@ func (s *server) onPush(cb OnPushFunc) {
}
func (s *server) onError(ctx *fasthttp.RequestCtx, status int, reason error) {
}
func (s *server) onDisconnected(c Client) {
delete(s._clients, c.ID())
@ -80,6 +84,7 @@ func (s *server) onConnection(ctx *fasthttp.RequestCtx) {
}
func (s *server) HandleClient(pattern string, o *ClientOptions) {
o.onDisconnected = s.onDisconnected
s._handlers[pattern] = o.Validate()
}