This commit is contained in:
crusader 2017-08-25 17:13:21 +09:00
parent f6cf23940d
commit 03e1946210
8 changed files with 133 additions and 133 deletions

View File

@ -1,99 +0,0 @@
package overflow_gateway_websocket
import (
"fmt"
"io"
"log"
"sync"
"time"
"git.loafle.net/overflow/overflow_gateway_websocket/websocket"
)
type Client interface {
ID() string
Conn() *websocket.Conn
Path() string
run()
}
type client struct {
id string
o *ClientOptions
conn *websocket.Conn
path string
messageType int
writeMTX sync.Mutex
}
func NewClient(id string, path string, o *ClientOptions, conn *websocket.Conn) Client {
c := &client{
id: id,
o: o,
conn: conn,
path: path,
messageType: websocket.TextMessage,
}
return c
}
func (c *client) ID() string {
return c.id
}
func (c *client) Conn() *websocket.Conn {
return c.conn
}
func (c *client) Path() string {
return c.path
}
func (c *client) run() {
hasReadTimeout := c.o.ReadTimeout > 0
c.conn.SetReadLimit(c.o.MaxMessageSize)
defer func() {
c.o.onDisconnected(c)
}()
for {
if hasReadTimeout {
c.conn.SetReadDeadline(time.Now().Add(c.o.ReadTimeout))
}
// messageType, data, err := c.conn.ReadMessage()
messageType, r, err := c.conn.NextReader()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
//c.fireError(err)
}
break
} else {
c.onMessage(messageType, r)
}
}
}
func (c *client) onMessage(messageType int, r io.Reader) {
result := c.o.Handler.OnMessage(c, messageType, r)
if nil == result {
return
}
c.writeMTX.Lock()
if writeTimeout := c.o.WriteTimeout; writeTimeout > 0 {
err := c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
log.Println(fmt.Errorf("%v", err))
return
}
err := c.conn.WriteMessage(c.messageType, result)
c.writeMTX.Unlock()
if nil != err {
}
}

View File

@ -3,5 +3,5 @@ package overflow_gateway_websocket
import "io"
type MessageHandler interface {
OnMessage(c Client, messageType int, r io.Reader) []byte
OnMessage(soc Socket, messageType int, r io.Reader) []byte
}

View File

@ -25,7 +25,7 @@ func NewHandler(o *Options) MessageHandler {
return h
}
func (h *messageHandler) OnMessage(c gws.Client, messageType int, r io.Reader) []byte {
func (h *messageHandler) OnMessage(soc gws.Socket, messageType int, r io.Reader) []byte {
var err error
req := &ServerRequest{}
err = json.NewDecoder(r).Decode(req)
@ -35,17 +35,17 @@ func (h *messageHandler) OnMessage(c gws.Client, messageType int, r io.Reader) [
var result []byte
if nil != req.Id {
result = h.onRequest(c, req)
result = h.onRequest(soc, req)
} else {
h.onNotify(c, req)
h.onNotify(soc, req)
}
return result
}
func (h *messageHandler) onRequest(c gws.Client, req *ServerRequest) []byte {
func (h *messageHandler) onRequest(soc gws.Socket, req *ServerRequest) []byte {
var err error
result, err := h.o.OnRequest(c, req.Method, req.Params)
result, err := h.o.OnRequest(soc, req.Method, req.Params)
res := &ServerResponse{
Id: req.Id,
@ -61,8 +61,8 @@ func (h *messageHandler) onRequest(c gws.Client, req *ServerRequest) []byte {
return j
}
func (h *messageHandler) onNotify(c gws.Client, req *ServerRequest) {
err := h.o.OnNotify(c, req.Method, req.Params)
func (h *messageHandler) onNotify(soc gws.Socket, req *ServerRequest) {
err := h.o.OnNotify(soc, req.Method, req.Params)
if nil != err {
log.Println(fmt.Errorf("%v", err))
}

View File

@ -5,8 +5,8 @@ import (
)
type (
OnRequestFunc func(c gws.Client, method string, params interface{}) (interface{}, error)
OnNotifyFunc func(c gws.Client, method string, params interface{}) error
OnRequestFunc func(soc gws.Socket, method string, params interface{}) (interface{}, error)
OnNotifyFunc func(soc gws.Socket, method string, params interface{}) error
)
// ClientOptions is configuration of the websocket server
@ -18,13 +18,13 @@ type Options struct {
// Validate validates the configuration
func (o *Options) Validate() *Options {
if o.OnRequest == nil {
o.OnRequest = func(c gws.Client, method string, params interface{}) (interface{}, error) {
o.OnRequest = func(soc gws.Socket, method string, params interface{}) (interface{}, error) {
return nil, nil
}
}
if o.OnNotify == nil {
o.OnNotify = func(c gws.Client, method string, params interface{}) error {
o.OnNotify = func(soc gws.Socket, method string, params interface{}) error {
return nil
}
}

View File

@ -12,22 +12,22 @@ type ()
type Server interface {
ListenAndServe(addr string) error
HandleClient(pattern string, o *ClientOptions)
HandleSocket(pattern string, o *SocketOptions)
}
type server struct {
_option *ServerOptions
_upgrader *websocket.Upgrader
_handlers map[string]*ClientOptions
_clients map[string]Client
_handlers map[string]*SocketOptions
_sockets map[string]Socket
_cMtx sync.Mutex
}
func NewServer(o *ServerOptions) Server {
s := &server{
_option: o.Validate(),
_handlers: make(map[string]*ClientOptions, 1),
_clients: make(map[string]Client, 100),
_handlers: make(map[string]*SocketOptions, 1),
_sockets: make(map[string]Socket, 100),
}
s._upgrader = &websocket.Upgrader{
@ -49,10 +49,10 @@ func (s *server) onError(ctx *fasthttp.RequestCtx, status int, reason error) {
s._option.OnError(ctx, status, reason)
}
func (s *server) onDisconnected(c Client) {
delete(s._clients, c.ID())
func (s *server) onDisconnected(soc Socket) {
delete(s._sockets, soc.ID())
s._option.OnDisconnected(c)
s._option.OnDisconnected(soc)
}
func (s *server) onConnection(ctx *fasthttp.RequestCtx) {
@ -69,18 +69,18 @@ func (s *server) onConnection(ctx *fasthttp.RequestCtx) {
return
}
s._cMtx.Lock()
cid := s._option.IDGenerator(ctx)
c := NewClient(cid, path, co, conn)
s._clients[cid] = c
id := s._option.IDGenerator(ctx)
soc := NewSocket(id, path, co, conn)
s._sockets[id] = soc
s._cMtx.Unlock()
s._option.OnConnection(c)
s._option.OnConnection(soc)
c.run()
soc.run()
})
}
func (s *server) HandleClient(pattern string, o *ClientOptions) {
func (s *server) HandleSocket(pattern string, o *SocketOptions) {
o.onDisconnected = s.onDisconnected
s._handlers[pattern] = o.Validate()

View File

@ -8,8 +8,8 @@ import (
)
type (
OnConnectionFunc func(c Client)
OnDisconnectedFunc func(c Client)
OnConnectionFunc func(soc Socket)
OnDisconnectedFunc func(soc Socket)
)
const (
@ -53,12 +53,12 @@ func (o *ServerOptions) Validate() *ServerOptions {
}
if o.OnConnection == nil {
o.OnConnection = func(c Client) {
o.OnConnection = func(soc Socket) {
}
}
if o.OnDisconnected == nil {
o.OnDisconnected = func(c Client) {
o.OnDisconnected = func(soc Socket) {
}
}

99
socket.go Normal file
View File

@ -0,0 +1,99 @@
package overflow_gateway_websocket
import (
"fmt"
"io"
"log"
"sync"
"time"
"git.loafle.net/overflow/overflow_gateway_websocket/websocket"
)
type Socket interface {
ID() string
Conn() *websocket.Conn
Path() string
run()
}
type socket struct {
id string
o *SocketOptions
conn *websocket.Conn
path string
messageType int
writeMTX sync.Mutex
}
func NewSocket(id string, path string, o *SocketOptions, conn *websocket.Conn) Socket {
c := &socket{
id: id,
o: o,
conn: conn,
path: path,
messageType: websocket.TextMessage,
}
return c
}
func (soc *socket) ID() string {
return soc.id
}
func (soc *socket) Conn() *websocket.Conn {
return soc.conn
}
func (soc *socket) Path() string {
return soc.path
}
func (soc *socket) run() {
hasReadTimeout := soc.o.ReadTimeout > 0
soc.conn.SetReadLimit(soc.o.MaxMessageSize)
defer func() {
soc.o.onDisconnected(soc)
}()
for {
if hasReadTimeout {
soc.conn.SetReadDeadline(time.Now().Add(soc.o.ReadTimeout))
}
// messageType, data, err := c.conn.ReadMessage()
messageType, r, err := soc.conn.NextReader()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
//c.fireError(err)
}
break
} else {
soc.onMessage(messageType, r)
}
}
}
func (soc *socket) onMessage(messageType int, r io.Reader) {
result := soc.o.Handler.OnMessage(soc, messageType, r)
if nil == result {
return
}
soc.writeMTX.Lock()
if writeTimeout := soc.o.WriteTimeout; writeTimeout > 0 {
err := soc.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
log.Println(fmt.Errorf("%v", err))
return
}
err := soc.conn.WriteMessage(soc.messageType, result)
soc.writeMTX.Unlock()
if nil != err {
}
}

View File

@ -26,8 +26,8 @@ const (
DefaultMaxMessageSize = 1024
)
// ClientOptions is configuration of the websocket server
type ClientOptions struct {
// SocketOptions is configuration of the websocket server
type SocketOptions struct {
onDisconnected OnDisconnectedFunc
Handler MessageHandler
@ -42,7 +42,7 @@ type ClientOptions struct {
}
// Validate validates the configuration
func (o *ClientOptions) Validate() *ClientOptions {
func (o *SocketOptions) Validate() *SocketOptions {
if nil == o.Handler {
log.Fatalf("Message Handler must specified.\n")
return nil
@ -69,7 +69,7 @@ func (o *ClientOptions) Validate() *ClientOptions {
}
if o.onDisconnected == nil {
o.onDisconnected = func(c Client) {
o.onDisconnected = func(soc Socket) {
}
}