This commit is contained in:
crusader 2017-08-31 19:08:14 +09:00
parent 65ac3aa84e
commit 8299f95f4c
13 changed files with 190 additions and 276 deletions

View File

@ -1,72 +0,0 @@
package jsonrpc
import (
"context"
"encoding/json"
"io"
"log"
gws "git.loafle.net/overflow/overflow_gateway_websocket"
)
type MessageHandler interface {
gws.MessageHandler
}
type messageHandler struct {
ctx context.Context
o *Options
}
func NewHandler(ctx context.Context, o *Options) MessageHandler {
h := &messageHandler{
ctx: ctx,
o: o,
}
return h
}
func (h *messageHandler) OnMessage(soc gws.Socket, messageType int, r io.Reader) []byte {
var err error
req := &ServerRequest{}
err = json.NewDecoder(r).Decode(req)
if err != nil {
}
var result []byte
if nil != req.Id {
result = h.onRequest(soc, req)
} else {
h.onNotify(soc, req)
}
return result
}
func (h *messageHandler) onRequest(soc gws.Socket, req *ServerRequest) []byte {
var err error
result, err := h.o.OnRequest(soc, req.Method, req.Params)
res := &ServerResponse{
Protocol: req.Protocol,
Result: result,
Error: err,
Id: req.Id,
}
j, err := json.Marshal(res)
if nil != err {
log.Printf("JSON RPC error: %v", err)
}
return j
}
func (h *messageHandler) onNotify(soc gws.Socket, req *ServerRequest) {
err := h.o.OnNotify(soc, req.Method, req.Params)
if nil != err {
log.Printf("JSON RPC error: %v", err)
}
}

View File

@ -0,0 +1,77 @@
package jsonrpc
import (
"context"
"encoding/json"
"fmt"
"io"
"go.uber.org/zap"
"git.loafle.net/commons_go/logging"
ogw "git.loafle.net/overflow/overflow_gateway_websocket"
)
type JSONRpc interface {
ogw.ProtocolHandler
}
type jsonRpc struct {
ctx context.Context
logger *zap.Logger
jh JSONRpcHandler
}
func New(ctx context.Context, jh JSONRpcHandler) JSONRpc {
jr := &jsonRpc{
ctx: ctx,
logger: logging.WithContext(ctx),
jh: jh,
}
return jr
}
func (jr *jsonRpc) OnMessage(soc ogw.Socket, messageType int, r io.Reader) []byte {
var err error
req := &ServerRequest{}
err = json.NewDecoder(r).Decode(req)
if err != nil {
}
var result []byte
if nil != req.Id {
result = jr.onRequest(soc, req)
} else {
jr.onNotify(soc, req)
}
return result
}
func (jr *jsonRpc) onRequest(soc ogw.Socket, req *ServerRequest) []byte {
var err error
result, err := jr.jh.OnRequest(soc, req.Method, req.Params)
res := &ServerResponse{
Protocol: req.Protocol,
Result: result,
Error: err,
Id: req.Id,
}
j, err := json.Marshal(res)
if nil != err {
jr.logger.Error(fmt.Sprintf("JSON RPC error: %v", err))
}
return j
}
func (jr *jsonRpc) onNotify(soc ogw.Socket, req *ServerRequest) {
err := jr.jh.OnNotify(soc, req.Method, req.Params)
if nil != err {
jr.logger.Error(fmt.Sprintf("JSON RPC error: %v", err))
}
}

View File

@ -0,0 +1,11 @@
package jsonrpc
import (
ogw "git.loafle.net/overflow/overflow_gateway_websocket"
)
// ClientOptions is configuration of the websocket server
type JSONRpcHandler interface {
OnRequest(soc ogw.Socket, method string, params []string) (interface{}, error)
OnNotify(soc ogw.Socket, method string, params []string) error
}

View File

@ -0,0 +1,22 @@
package jsonrpc
import (
ogw "git.loafle.net/overflow/overflow_gateway_websocket"
)
// SocketOptions is configuration of the websocket server
type JSONRpcHandlers struct {
}
func (jh *JSONRpcHandlers) OnRequest(soc ogw.Socket, method string, params []string) (interface{}, error) {
return nil, nil
}
func (jh *JSONRpcHandlers) OnNotify(soc ogw.Socket, method string, params []string) error {
return nil
}
// Validate validates the configuration
func (js *JSONRpcHandlers) Validate() {
}

View File

@ -1,33 +0,0 @@
package jsonrpc
import (
gws "git.loafle.net/overflow/overflow_gateway_websocket"
)
type (
OnRequestFunc func(soc gws.Socket, method string, params []string) (interface{}, error)
OnNotifyFunc func(soc gws.Socket, method string, params []string) error
)
// ClientOptions is configuration of the websocket server
type Options struct {
OnRequest OnRequestFunc
OnNotify OnNotifyFunc
}
// Validate validates the configuration
func (o *Options) Validate() *Options {
if o.OnRequest == nil {
o.OnRequest = func(soc gws.Socket, method string, params []string) (interface{}, error) {
return nil, nil
}
}
if o.OnNotify == nil {
o.OnNotify = func(soc gws.Socket, method string, params []string) error {
return nil
}
}
return o
}

View File

@ -2,6 +2,6 @@ package overflow_gateway_websocket
import "io"
type MessageHandler interface {
type ProtocolHandler interface {
OnMessage(soc Socket, messageType int, r io.Reader) []byte
}

View File

@ -4,6 +4,9 @@ import (
"context"
"net/http"
"go.uber.org/zap"
"git.loafle.net/commons_go/logging"
channelUtil "git.loafle.net/commons_go/util/channel"
"git.loafle.net/overflow/overflow_gateway_websocket/websocket"
"github.com/valyala/fasthttp"
@ -18,14 +21,15 @@ type socketsChannelAction struct {
type Server interface {
ListenAndServe(addr string) error
HandleSocket(pattern string, o *SocketOptions)
HandleSocket(pattern string, o SocketHandler)
}
type server struct {
_ctx context.Context
_logger *zap.Logger
_sh ServerHandler
_upgrader *websocket.Upgrader
_handlers map[string]*SocketOptions
_handlers map[string]SocketHandler
_sockets map[string]Socket
_socketsCh chan socketsChannelAction
}
@ -35,8 +39,9 @@ func NewServer(ctx context.Context, sh ServerHandler) Server {
s := &server{
_ctx: ctx,
_logger: logging.WithContext(ctx),
_sh: sh,
_handlers: make(map[string]*SocketOptions, 1),
_handlers: make(map[string]SocketHandler, 1),
_sockets: make(map[string]Socket, 100),
_socketsCh: make(chan socketsChannelAction),
}
@ -125,10 +130,11 @@ func (s *server) Sockets() map[string]Socket {
return s._sockets
}
func (s *server) HandleSocket(pattern string, o *SocketOptions) {
o.onDisconnected = s.onDisconnected
func (s *server) HandleSocket(pattern string, soch SocketHandler) {
soch.(*SocketHandlers).onDisconnected = s.onDisconnected
s._handlers[pattern] = o.Validate()
soch.Validate()
s._handlers[pattern] = soch
}
func (s *server) ListenAndServe(addr string) error {

View File

@ -7,6 +7,17 @@ import (
"github.com/valyala/fasthttp"
)
const (
// DefaultHandshakeTimeout is default value of websocket handshake Timeout
DefaultHandshakeTimeout = 0
// DefaultReadBufferSize is default value of Read Buffer Size
DefaultReadBufferSize = 4096
// DefaultWriteBufferSize is default value of Write Buffer Size
DefaultWriteBufferSize = 4096
// DefaultEnableCompression is default value of support compression
DefaultEnableCompression = false
)
// ServerHandlers is implementation of the Server handler interface
type ServerHandlers struct {
HandshakeTimeout time.Duration

View File

@ -1,82 +0,0 @@
package overflow_gateway_websocket
import (
"time"
uuid "github.com/satori/go.uuid"
"github.com/valyala/fasthttp"
)
type (
OnConnectionFunc func(soc Socket)
OnDisconnectedFunc func(soc Socket)
OnCheckOriginFunc func(ctx *fasthttp.RequestCtx) bool
)
const (
// DefaultHandshakeTimeout is default value of websocket handshake Timeout
DefaultHandshakeTimeout = 0
// DefaultReadBufferSize is default value of Read Buffer Size
DefaultReadBufferSize = 4096
// DefaultWriteBufferSize is default value of Write Buffer Size
DefaultWriteBufferSize = 4096
// DefaultEnableCompression is default value of support compression
DefaultEnableCompression = false
)
var (
// DefaultIDGenerator returns the UUID of the client
DefaultIDGenerator = func(ctx *fasthttp.RequestCtx) string { return uuid.NewV4().String() }
)
// ServerOptions is configuration of the websocket server
type ServerOptions struct {
OnConnection OnConnectionFunc
OnDisconnected OnDisconnectedFunc
OnCheckOrigin OnCheckOriginFunc
OnError func(ctx *fasthttp.RequestCtx, status int, reason error)
IDGenerator func(ctx *fasthttp.RequestCtx) string
HandshakeTimeout time.Duration
ReadBufferSize int
WriteBufferSize int
EnableCompression bool
}
// Validate validates the configuration
func (o *ServerOptions) Validate() *ServerOptions {
if o.ReadBufferSize <= 0 {
o.ReadBufferSize = DefaultReadBufferSize
}
if o.WriteBufferSize <= 0 {
o.WriteBufferSize = DefaultWriteBufferSize
}
if o.OnConnection == nil {
o.OnConnection = func(soc Socket) {
}
}
if o.OnDisconnected == nil {
o.OnDisconnected = func(soc Socket) {
}
}
if o.OnError == nil {
o.OnError = func(ctx *fasthttp.RequestCtx, status int, reason error) {
}
}
if o.OnCheckOrigin == nil {
o.OnCheckOrigin = func(ctx *fasthttp.RequestCtx) bool {
return true
}
}
if o.IDGenerator == nil {
o.IDGenerator = DefaultIDGenerator
}
return o
}

View File

@ -1,56 +0,0 @@
package overflow_gateway_websocket
import (
"time"
uuid "github.com/satori/go.uuid"
"github.com/valyala/fasthttp"
)
// ServerOptions is configuration of the websocket server
type ServerOptions2 struct {
HandshakeTimeout time.Duration
ReadBufferSize int
WriteBufferSize int
EnableCompression bool
}
func (o *ServerOptions2) GetHandshakeTimeout() time.Duration {
return o.HandshakeTimeout
}
func (o *ServerOptions2) GetReadBufferSize() int {
return o.ReadBufferSize
}
func (o *ServerOptions2) GetWriteBufferSize() int {
return o.WriteBufferSize
}
func (o *ServerOptions2) GetEnableCompression() bool {
return o.EnableCompression
}
func (o *ServerOptions2) OnConnection(soc Socket) {
}
func (o *ServerOptions2) OnDisconnected(soc Socket) {
}
func (o *ServerOptions2) OnCheckOrigin(ctx *fasthttp.RequestCtx) bool {
return true
}
func (o *ServerOptions2) OnError(ctx *fasthttp.RequestCtx, status int, reason error) {
}
func (o *ServerOptions2) OnIDGenerate(ctx *fasthttp.RequestCtx) string {
return uuid.NewV4().String()
}
// Validate validates the configuration
func (o *ServerOptions2) Validate() {
if o.ReadBufferSize <= 0 {
o.ReadBufferSize = DefaultReadBufferSize
}
if o.WriteBufferSize <= 0 {
o.WriteBufferSize = DefaultWriteBufferSize
}
}

View File

@ -20,7 +20,7 @@ type Socket interface {
type socket struct {
ctx context.Context
id string
o *SocketOptions
sh SocketHandler
conn *websocket.Conn
path string
messageType int
@ -28,11 +28,11 @@ type socket struct {
disconnectCh chan bool
}
func NewSocket(ctx context.Context, id string, path string, o *SocketOptions, conn *websocket.Conn) Socket {
func NewSocket(ctx context.Context, id string, path string, sh SocketHandler, conn *websocket.Conn) Socket {
c := &socket{
ctx: ctx,
id: id,
o: o,
sh: sh,
conn: conn,
path: path,
writeCh: make(chan []byte),
@ -60,8 +60,8 @@ func (soc *socket) Write(m []byte) {
}
func (soc *socket) run() {
hasReadTimeout := soc.o.ReadTimeout > 0
soc.conn.SetReadLimit(soc.o.MaxMessageSize)
hasReadTimeout := soc.sh.GetReadTimeout() > 0
soc.conn.SetReadLimit(soc.sh.GetMaxMessageSize())
defer func() {
soc.onDisconnected()
@ -71,7 +71,7 @@ func (soc *socket) run() {
for {
if hasReadTimeout {
soc.conn.SetReadDeadline(time.Now().Add(soc.o.ReadTimeout))
soc.conn.SetReadDeadline(time.Now().Add(soc.sh.GetReadTimeout()))
}
// messageType, data, err := c.conn.ReadMessage()
@ -90,11 +90,11 @@ func (soc *socket) run() {
func (soc *socket) onDisconnected() {
soc.disconnectCh <- true
soc.o.onDisconnected(soc)
soc.sh.(*SocketHandlers).onDisconnected(soc)
}
func (soc *socket) onMessage(messageType int, r io.Reader) {
result := soc.o.Handler.OnMessage(soc, messageType, r)
result := soc.sh.GetProtocolHandler().OnMessage(soc, messageType, r)
if nil == result {
return
}
@ -106,7 +106,7 @@ func (soc *socket) listenWrite() {
select {
// send message to the client
case w := <-soc.writeCh:
if writeTimeout := soc.o.WriteTimeout; writeTimeout > 0 {
if writeTimeout := soc.sh.GetWriteTimeout(); writeTimeout > 0 {
err := soc.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
log.Printf("Socket timeout: %v", err)
}

17
socket_handler.go Normal file
View File

@ -0,0 +1,17 @@
package overflow_gateway_websocket
import "time"
// SocketHandler is configuration of the websocket server
type SocketHandler interface {
GetMaxMessageSize() int64
GetWriteTimeout() time.Duration
GetReadTimeout() time.Duration
GetPongTimeout() time.Duration
GetPingTimeout() time.Duration
GetPingPeriod() time.Duration
IsBinaryMessage() bool
GetProtocolHandler() ProtocolHandler
Validate()
}

View File

@ -3,12 +3,6 @@ package overflow_gateway_websocket
import (
"log"
"time"
"github.com/valyala/fasthttp"
)
type (
OnErrorFunc func(ctx *fasthttp.RequestCtx, status int, reason error)
)
const (
@ -27,10 +21,10 @@ const (
)
// SocketOptions is configuration of the websocket server
type SocketOptions struct {
onDisconnected OnDisconnectedFunc
type SocketHandlers struct {
onDisconnected func(soc Socket)
Handler MessageHandler
Handler ProtocolHandler
MaxMessageSize int64
WriteTimeout time.Duration
@ -41,11 +35,36 @@ type SocketOptions struct {
BinaryMessage bool
}
func (sh *SocketHandlers) GetMaxMessageSize() int64 {
return sh.MaxMessageSize
}
func (sh *SocketHandlers) GetWriteTimeout() time.Duration {
return sh.WriteTimeout
}
func (sh *SocketHandlers) GetReadTimeout() time.Duration {
return sh.ReadTimeout
}
func (sh *SocketHandlers) GetPongTimeout() time.Duration {
return sh.PongTimeout
}
func (sh *SocketHandlers) GetPingTimeout() time.Duration {
return sh.PingTimeout
}
func (sh *SocketHandlers) GetPingPeriod() time.Duration {
return sh.PingPeriod
}
func (sh *SocketHandlers) IsBinaryMessage() bool {
return sh.BinaryMessage
}
func (sh *SocketHandlers) GetProtocolHandler() ProtocolHandler {
return sh.Handler
}
// Validate validates the configuration
func (o *SocketOptions) Validate() *SocketOptions {
func (o *SocketHandlers) Validate() {
if nil == o.Handler {
log.Fatalf("Message Handler must specified.\n")
return nil
}
if o.WriteTimeout < 0 {
@ -68,10 +87,4 @@ func (o *SocketOptions) Validate() *SocketOptions {
o.MaxMessageSize = DefaultMaxMessageSize
}
if o.onDisconnected == nil {
o.onDisconnected = func(soc Socket) {
}
}
return o
}