This commit is contained in:
crusader 2017-07-13 21:01:28 +09:00
parent 1d480c52cd
commit f49a711b5a
13 changed files with 656 additions and 368 deletions

View File

@ -6,6 +6,13 @@ import:
version: ^1.2.0
- package: google.golang.org/grpc
version: ^1.4.2
subpackages:
- health/grpc_health_v1
- package: golang.org/x/net
subpackages:
- websocket
- package: github.com/gorilla/rpc
version: ^1.1.0
- package: git.loafle.net/overflow/overflow_api_server
subpackages:
- golang

46
main.go
View File

@ -1,11 +1,10 @@
package main
import (
"log"
"net/http"
"google.golang.org/grpc"
"git.loafle.net/overflow/overflow_websocket_service/service"
"git.loafle.net/overflow/overflow_service_websocket/protocol/jsonrpc"
"git.loafle.net/overflow/overflow_service_websocket/server"
)
var (
@ -13,38 +12,17 @@ var (
)
func main() {
connPool, err := service.New(10, 30, func() (*grpc.ClientConn, error) {
return grpc.Dial(*contentGrpcAddr, grpc.WithDialer(dialer), grpc.WithInsecure())
})
if err != nil {
log.Fatalf("create service connection pool error: %v\n", err)
}
defer connPool.Destroy()
connPool.Ping = func(conn *grpc.ClientConn) bool {
// check connection status
return true
}
connPool.Close = func(conn *grpc.ClientConn) {
// close connection
conn.Close()
}
ws := server.New(server.Options{})
ws.RegistProtocol("jsonrpc", jsonrpc.NewHandler())
ws.OnConnection(func(c server.Client) {
c.OnDisconnect(func(Client) {
// ws := websocket.New(websocket.Config{})
// http.HandleFunc("/ws", ws.Handler())
})
})
http.HandleFunc("/ws", ws.HTTPHandler())
// ws.OnConnection(handleWebsocketConnection)
// http.ListenAndServe(host, nil)
}
func handleWebsocketConnection(c websocket.Connection) {
}
func callGrpcService(connPool *service.Pool) {
conn, err := connPool.Get()
if err != nil {
log.Printf("get connection error: %v\n", err)
}
// * Important
defer connPool.Put(conn)
http.ListenAndServe(host, nil)
}

167
pool/grpc/service.go Normal file
View File

@ -0,0 +1,167 @@
package grpc
import (
"fmt"
"log"
"sync"
"git.loafle.net/overflow/overflow_service_websocket/pool"
"google.golang.org/grpc"
)
type ClientCreator func(*grpc.ClientConn) (interface{}, error)
type ConnectionCreator func() (*grpc.ClientConn, error)
type grpcPool struct {
creator ClientCreator
connectionCreator ConnectionCreator
clients chan interface{}
connections map[interface{}]*grpc.ClientConn
mtx sync.Mutex
initCapacity int
maxCapacity int
}
func New(initCap int, maxCap int, creator ClientCreator, connectionCreator ConnectionCreator) (pool.Pool, error) {
if initCap < 0 || maxCap <= 0 || initCap > maxCap {
return nil, fmt.Errorf("invalid capacity settings")
}
if creator == nil {
return nil, fmt.Errorf("invalid ClientCreator settings")
}
if connectionCreator == nil {
return nil, fmt.Errorf("invalid ConnectionCreator settings")
}
p := &grpcPool{
initCapacity: initCap,
maxCapacity: maxCap,
clients: make(chan interface{}, initCap),
connections: make(map[interface{}]*grpc.ClientConn, initCap),
creator: creator,
connectionCreator: connectionCreator,
}
for i := 0; i < initCap; i++ {
c, err := p.create()
if err != nil {
return p, err
}
p.clients <- c
}
return p, nil
}
func (p *grpcPool) Capacity() int {
return cap(p.clients)
}
func (p *grpcPool) Available() int {
return len(p.clients)
}
func (p *grpcPool) Get() (interface{}, error) {
if p.clients == nil {
// pool aleardy destroyed, returns new client
return p.create()
}
for {
select {
case c := <-p.clients:
return c, nil
default:
return p.create()
}
}
}
func (p *grpcPool) Put(c interface{}) {
select {
case p.clients <- c:
return
default:
// pool is full, close passed connection
p.destroy(c)
return
}
}
func (p *grpcPool) Destroy() {
p.mtx.Lock()
defer p.mtx.Unlock()
if p.clients == nil {
// pool aleardy destroyed
return
}
close(p.clients)
for c := range p.clients {
p.destroy(c)
}
p.clients = nil
}
func (p *grpcPool) putConnection(c interface{}, conn *grpc.ClientConn) error {
if nil != p.connections[c] {
return fmt.Errorf("Connection alread exist")
}
p.connections[c] = conn
return nil
}
func (p *grpcPool) removeConnection(c interface{}) (*grpc.ClientConn, error) {
if nil == p.connections[c] {
return nil, fmt.Errorf("Connection is not exist")
}
conn := p.connections[c]
delete(p.connections, conn)
return conn, nil
}
func (p *grpcPool) create() (interface{}, error) {
if p.creator == nil {
return nil, fmt.Errorf("ClientCreator is nil, can not create client")
}
var err error
conn, err := p.connectionCreate()
if nil != err {
return nil, err
}
c, err := p.creator(conn)
if nil != err {
err = conn.Close()
return nil, err
}
err = p.putConnection(c, conn)
if nil != err {
err = conn.Close()
return nil, err
}
return c, nil
}
func (p *grpcPool) destroy(c interface{}) {
conn, err := p.removeConnection(c)
if nil != err {
log.Println(fmt.Errorf("%v", err))
return
}
err = conn.Close()
if nil != err {
log.Println(fmt.Errorf("%v", err))
return
}
}
func (p *grpcPool) connectionCreate() (*grpc.ClientConn, error) {
if p.connectionCreator == nil {
return nil, fmt.Errorf("ConnectionCreator is nil, can not create connection")
}
return p.connectionCreator()
}

9
pool/pool.go Normal file
View File

@ -0,0 +1,9 @@
package pool
type Pool interface {
Get() (interface{}, error)
Put(interface{})
Destroy()
Capacity() int
Available() int
}

35
protocol/hadler.go Normal file
View File

@ -0,0 +1,35 @@
package protocol
import "errors"
type ErrorCode int
const (
E_PARSE ErrorCode = -32700
E_INVALID_REQ ErrorCode = -32600
E_NO_METHOD ErrorCode = -32601
E_BAD_PARAMS ErrorCode = -32602
E_INTERNAL ErrorCode = -32603
E_SERVER ErrorCode = -32000
)
var ErrNullResult = errors.New("result is null")
type Error struct {
Code ErrorCode `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
type Handler interface {
Handle([]byte) ([]byte, *Error)
}
func NewError(code ErrorCode, err error, data interface{}) *Error {
e := &Error{
Code: code,
Message: err.Error(),
Data: data,
}
return e
}

94
protocol/jsonrpc/rpc.go Normal file
View File

@ -0,0 +1,94 @@
package jsonrpc
import (
"encoding/json"
"log"
"strings"
"golang.org/x/net/context"
"google.golang.org/grpc"
serverGrpc "git.loafle.net/overflow/overflow_api_server/golang"
"git.loafle.net/overflow/overflow_service_websocket/pool"
grpcPool "git.loafle.net/overflow/overflow_service_websocket/pool/grpc"
"git.loafle.net/overflow/overflow_service_websocket/protocol"
)
var (
host = "localhost:8080"
)
type Request struct {
Method string `json:"method"`
Params []string `json:"params,omitempty"`
}
type Response struct {
Result string `json:"result,omitempty"`
}
// NewHandler returns a new JSON RPC handler.
func NewHandler() protocol.Handler {
servicePool, err := grpcPool.New(10, 30,
func(conn *grpc.ClientConn) (interface{}, error) {
return serverGrpc.NewOverflowApiServerClient(conn), nil
},
func() (*grpc.ClientConn, error) {
return grpc.Dial(host, grpc.WithInsecure())
},
)
if nil != err {
log.Fatal(err)
return nil
}
h := &handler{
pool: servicePool,
}
return h
}
// Codec creates a CodecRequest to process each request.
type handler struct {
pool pool.Pool
}
// NewRequest returns a CodecRequest.
func (h *handler) Handle(data []byte) ([]byte, *protocol.Error) {
req := new(Request)
err := json.Unmarshal(data, &req)
if nil != err {
return nil, protocol.NewError(protocol.E_PARSE, err, nil)
}
parts := strings.Split(req.Method, ".")
si := &serverGrpc.ServerInput{
Target: parts[0],
Method: parts[1],
Params: req.Params,
}
c, err := h.pool.Get()
if nil != err {
return nil, protocol.NewError(protocol.E_INTERNAL, err, nil)
}
defer h.pool.Put(c)
client := c.(serverGrpc.OverflowApiServerClient)
out, err := client.Exec(context.Background(), si)
if err != nil {
return nil, protocol.NewError(protocol.E_SERVER, err, err)
}
res := &Response{
ID: req.ID,
Result: out.Result,
}
r, err := json.Marshal(res)
if err != nil {
return nil, protocol.NewError(protocol.E_INTERNAL, err, nil)
}
return r, nil
}

243
server/client.go Normal file
View File

@ -0,0 +1,243 @@
package server
import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"sync"
"time"
"git.loafle.net/overflow/overflow_service_websocket/protocol"
"github.com/gorilla/websocket"
)
type ClientStatus uint8
const (
CONNECTED ClientStatus = iota + 1
DISCONNECTED
)
type (
// OnDisconnectFunc is callback function that used when client is disconnected
OnDisconnectFunc func(Client)
// OnErrorFunc is callback function that used when error occurred
OnErrorFunc func(error)
)
// Client is interface
type Client interface {
ID() string
HTTPRequest() *http.Request
Conn() Connection
Disconnect() error
OnDisconnect(OnDisconnectFunc)
OnError(OnErrorFunc)
initialize() error
destroy() error
}
type client struct {
id string
status ClientStatus
messageType int
server Server
httpRequest *http.Request
conn Connection
pingTicker *time.Ticker
writeMTX sync.Mutex
onDisconnectListeners []OnDisconnectFunc
onErrorListeners []OnErrorFunc
}
var _ Client = &client{}
func newClient(s Server, r *http.Request, conn Connection, clientID string) Client {
c := &client{
id: clientID,
status: CONNECTED,
messageType: websocket.TextMessage,
server: s,
httpRequest: r,
conn: conn,
onDisconnectListeners: make([]OnDisconnectFunc, 0),
onErrorListeners: make([]OnErrorFunc, 0),
}
if s.GetOptions().BinaryMessage {
c.messageType = websocket.BinaryMessage
}
return c
}
func (c *client) ID() string {
return c.id
}
func (c *client) HTTPRequest() *http.Request {
return c.httpRequest
}
func (c *client) Conn() Connection {
return c.conn
}
func (c *client) Disconnect() error {
return c.server.Disconnect(c.ID())
}
func (c *client) OnDisconnect(cb OnDisconnectFunc) {
c.onDisconnectListeners = append(c.onDisconnectListeners, cb)
}
func (c *client) OnError(cb OnErrorFunc) {
c.onErrorListeners = append(c.onErrorListeners, cb)
}
func (c *client) initialize() error {
c.status = CONNECTED
c.startPing()
c.startReading()
return nil
}
func (c *client) destroy() error {
c.status = DISCONNECTED
c.pingTicker.Stop()
for _, cb := range c.onDisconnectListeners {
cb(c)
}
return c.conn.Close()
}
func (c *client) startPing() {
c.pingTicker = time.NewTicker(c.server.GetOptions().PingPeriod)
go func() {
for {
<-c.pingTicker.C
if err := c.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(c.server.GetOptions().PingTimeout)); err != nil {
log.Println("ping:", err)
}
}
}()
}
func (c *client) startReading() {
hasReadTimeout := c.server.GetOptions().ReadTimeout > 0
c.conn.SetReadLimit(c.server.GetOptions().MaxMessageSize)
c.conn.SetPongHandler(func(message string) error {
if hasReadTimeout {
c.conn.SetReadDeadline(time.Now().Add(c.server.GetOptions().PongTimeout))
}
return nil
})
defer func() {
c.Disconnect()
}()
for {
if hasReadTimeout {
c.conn.SetReadDeadline(time.Now().Add(c.server.GetOptions().ReadTimeout))
}
// messageType, data, err := c.conn.ReadMessage()
messageType, r, err := c.conn.NextReader()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
c.fireError(err)
}
break
} else {
c.onMessageReceived(messageType, r)
}
}
}
func (c *client) onMessageReceived(messageType int, r io.Reader) {
req := new(Request)
err := json.NewDecoder(r).Decode(req)
if err != nil {
}
h := c.server.ProtocolHandler(req.Protocol)
if nil == h {
}
hr, perr := h.Handle(*req.Body)
if perr != nil {
c.writeError(req, perr)
}
c.writeResult(req, hr)
}
func (c *client) fireError(err error) {
for _, cb := range c.onErrorListeners {
cb(err)
}
}
func (c *client) writeResult(r *Request, data []byte) {
c.writeMTX.Lock()
if writeTimeout := c.server.GetOptions().WriteTimeout; writeTimeout > 0 {
// set the write deadline based on the configuration
err := c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
log.Println(fmt.Errorf("%v", err))
}
raw := json.RawMessage(data)
res := &Response{
Protocol: r.Protocol,
ID: r.ID,
Body: &raw,
Error: nil,
}
jRes, err := json.Marshal(res)
if nil != err {
}
err = c.conn.WriteMessage(c.messageType, jRes)
c.writeMTX.Unlock()
if nil != err {
c.Disconnect()
}
}
func (c *client) writeError(r *Request, perr *protocol.Error) {
c.writeMTX.Lock()
if writeTimeout := c.server.GetOptions().WriteTimeout; writeTimeout > 0 {
// set the write deadline based on the configuration
err := c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
log.Println(fmt.Errorf("%v", err))
}
res := &Response{
Protocol: r.Protocol,
ID: r.ID,
Body: nil,
Error: perr,
}
jRes, err := json.Marshal(res)
if nil != err {
}
err = c.conn.WriteMessage(c.messageType, jRes)
c.writeMTX.Unlock()
if nil != err {
c.Disconnect()
}
}

View File

@ -1,4 +1,4 @@
package websocket
package server
import (
"io"

View File

@ -1,4 +1,4 @@
package websocket
package server
import (
"net/http"
@ -49,8 +49,8 @@ func (os OptionSet) Set(o *Options) {
// Options is configuration of the websocket server
type Options struct {
Error func(res http.ResponseWriter, req *http.Request, status int, reason error)
CheckOrigin func(req *http.Request) bool
OnError func(res http.ResponseWriter, req *http.Request, status int, reason error)
OnCheckOrigin func(req *http.Request) bool
WriteTimeout time.Duration
ReadTimeout time.Duration
PongTimeout time.Duration
@ -65,8 +65,8 @@ type Options struct {
// Set is the func which makes the OptionSet an OptionSetter, this is used mostly
func (o *Options) Set(main *Options) {
main.Error = o.Error
main.CheckOrigin = o.CheckOrigin
main.OnError = o.OnError
main.OnCheckOrigin = o.OnCheckOrigin
main.WriteTimeout = o.WriteTimeout
main.ReadTimeout = o.ReadTimeout
main.PongTimeout = o.PongTimeout
@ -79,18 +79,18 @@ func (o *Options) Set(main *Options) {
main.IDGenerator = o.IDGenerator
}
// Error sets the error handler
func Error(val func(res http.ResponseWriter, req *http.Request, status int, reason error)) OptionSet {
// OnError sets the error handler
func OnError(val func(res http.ResponseWriter, req *http.Request, status int, reason error)) OptionSet {
return func(o *Options) {
o.Error = val
o.OnError = val
}
}
// CheckOrigin sets a handler which will check if different origin(domains) are allowed to contact with
// OnCheckOrigin sets a handler which will check if different origin(domains) are allowed to contact with
// the websocket server
func CheckOrigin(val func(req *http.Request) bool) OptionSet {
func OnCheckOrigin(val func(req *http.Request) bool) OptionSet {
return func(o *Options) {
o.CheckOrigin = val
o.OnCheckOrigin = val
}
}
@ -207,13 +207,13 @@ func (o *Options) Validate() {
o.WriteBufferSize = DefaultWriteBufferSize
}
if o.Error == nil {
o.Error = func(res http.ResponseWriter, req *http.Request, status int, reason error) {
if o.OnError == nil {
o.OnError = func(res http.ResponseWriter, req *http.Request, status int, reason error) {
}
}
if o.CheckOrigin == nil {
o.CheckOrigin = func(req *http.Request) bool {
if o.OnCheckOrigin == nil {
o.OnCheckOrigin = func(req *http.Request) bool {
return true
}
}

20
server/protocol.go Normal file
View File

@ -0,0 +1,20 @@
package server
import (
"encoding/json"
"git.loafle.net/overflow/overflow_service_websocket/protocol"
)
type Request struct {
Protocol string `json:"protocol"`
ID *json.RawMessage `json:"id"`
Body *json.RawMessage `json:"body"`
}
type Response struct {
Protocol string `json:"protocol"`
ID *json.RawMessage `json:"id"`
Error *protocol.Error `json:"error,omitempty"`
Body *json.RawMessage `json:"body"`
}

View File

@ -1,11 +1,13 @@
package websocket
package server
import (
"fmt"
"log"
"net/http"
"strings"
"sync"
"git.loafle.net/overflow/overflow_service_websocket/protocol"
"github.com/gorilla/websocket"
)
@ -17,12 +19,15 @@ type (
// Server is the websocket server,
// listens on the config's port, the critical part is the event OnConnection
type Server interface {
Set(...OptionSetter)
Handler() http.Handler
SetOptions(...OptionSetter)
GetOptions() *Options
RegistProtocol(protocol string, h protocol.Handler)
ProtocolHandler(protocol string) protocol.Handler
HTTPHandler() http.Handler
HandleConnection(*http.Request, Connection)
OnConnection(cb OnConnectionFunc)
IsConnected(clientID string) bool
GetSocket(clientID string) Client
GetClient(clientID string) Client
Disconnect(clientID string) error
}
@ -31,6 +36,7 @@ type server struct {
clients map[string]Client
clientMTX sync.Mutex
onConnectionListeners []OnConnectionFunc
protocols map[string]protocol.Handler
}
var _ Server = &server{}
@ -50,18 +56,35 @@ func newServer(setters ...OptionSetter) Server {
s := &server{
clients: make(map[string]Client, 100),
onConnectionListeners: make([]OnConnectionFunc, 0),
protocols: make(map[string]protocol.Handler, 0),
}
s.Set(setters...)
s.SetOptions(setters...)
return s
}
// Set is function that set option values
func Set(setters ...OptionSetter) {
defaultServer.Set(setters...)
func RegistProtocol(protocol string, h protocol.Handler) {
defaultServer.RegistProtocol(protocol, h)
}
func (s *server) Set(setters ...OptionSetter) {
func (s *server) RegistProtocol(protocol string, h protocol.Handler) {
s.protocols[strings.ToLower(protocol)] = h
}
func ProtocolHandler(protocol string) protocol.Handler {
return defaultServer.ProtocolHandler(protocol)
}
func (s *server) ProtocolHandler(protocol string) protocol.Handler {
return s.protocols[protocol]
}
// SetOptions is function that set option values
func SetOptions(setters ...OptionSetter) {
defaultServer.SetOptions(setters...)
}
func (s *server) SetOptions(setters ...OptionSetter) {
for _, setter := range setters {
setter.Set(s.options)
}
@ -69,19 +92,28 @@ func (s *server) Set(setters ...OptionSetter) {
s.options.Validate()
}
// Handler is the function that used on http request
func Handler() http.Handler {
return defaultServer.Handler()
// GetOptions is function that get option values
func GetOptions() *Options {
return defaultServer.GetOptions()
}
func (s *server) Handler() http.Handler {
func (s *server) GetOptions() *Options {
return s.options
}
// Handler is the function that used on http request
func HTTPHandler() http.Handler {
return defaultServer.HTTPHandler()
}
func (s *server) HTTPHandler() http.Handler {
o := s.options
upgrader := websocket.Upgrader{
ReadBufferSize: o.ReadBufferSize,
WriteBufferSize: o.WriteBufferSize,
Error: o.Error,
CheckOrigin: o.CheckOrigin,
Error: o.OnError,
CheckOrigin: o.OnCheckOrigin,
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@ -110,6 +142,8 @@ func (s *server) HandleConnection(r *http.Request, conn Connection) {
for i := range s.onConnectionListeners {
s.onConnectionListeners[i](c)
}
c.initialize()
}
// OnConnection is function that add the callback when client is connected to default Server
@ -131,12 +165,12 @@ func (s *server) IsConnected(clientID string) bool {
return soc != nil
}
// GetSocket is function that return client instance
func GetSocket(clientID string) Client {
return defaultServer.GetSocket(clientID)
// GetClient is function that return client instance
func GetClient(clientID string) Client {
return defaultServer.GetClient(clientID)
}
func (s *server) GetSocket(clientID string) Client {
func (s *server) GetClient(clientID string) Client {
return s.clients[clientID]
}

View File

@ -1,104 +0,0 @@
package service
import (
"fmt"
"sync"
grpc "google.golang.org/grpc"
)
type ConnectionGenerator func() (*grpc.ClientConn, error)
type ConnectionChecker func(*grpc.ClientConn) bool
type ConnectionCloser func(*grpc.ClientConn)
type Pool struct {
New ConnectionGenerator
Ping ConnectionChecker
Close ConnectionCloser
Get func() (*grpc.ClientConn, error)
Put func(conn *grpc.ClientConn)
Destroy func()
store chan *grpc.ClientConn
mtx sync.Mutex
initCapacity int
maxCapacity int
}
func New(initCap int, maxCap int, connector ConnectionGenerator) (*Pool, error) {
if initCap < 0 || maxCap <= 0 || initCap > maxCap {
return nil, fmt.Errorf("invalid capacity settings")
}
p := new(Pool)
p.store = make(chan *grpc.ClientConn, maxCap)
if connector != nil {
p.New = connector
}
for i := 0; i < initCap; i++ {
conn, err := p.create()
if err != nil {
return p, err
}
p.store <- conn
}
return p, nil
}
func (p *Pool) Len() int {
return len(p.store)
}
func (p *Pool) Get() (*grpc.ClientConn, error) {
if p.store == nil {
// pool aleardy destroyed, returns new connection
return p.create()
}
for {
select {
case conn := <-p.store:
if p.Ping != nil && p.Ping(conn) == false {
continue
}
return conn, nil
default:
return p.create()
}
}
}
func (p *Pool) Put(conn *grpc.ClientConn) {
select {
case p.store <- conn:
return
default:
// pool is full, close passed connection
if p.Close != nil {
p.Close(conn)
}
return
}
}
func (p *Pool) Destroy() {
p.mtx.Lock()
defer p.mtx.Unlock()
if p.store == nil {
// pool aleardy destroyed
return
}
close(p.store)
for conn := range p.store {
if p.Close != nil {
p.Close(conn)
}
}
p.store = nil
}
func (p *Pool) create() (*grpc.ClientConn, error) {
if p.New == nil {
return nil, fmt.Errorf("Pool.New is nil, can not create connection")
}
return p.New()
}

View File

@ -1,195 +0,0 @@
package websocket
import (
"fmt"
"log"
"net/http"
"sync"
"time"
gWebsocket "github.com/gorilla/websocket"
)
type ClientStatus uint8
const (
CONNECTED ClientStatus = iota + 1
DISCONNECTED
)
type (
// OnDisconnectFunc is callback function that used when client is disconnected
OnDisconnectFunc func(Client)
// OnErrorFunc is callback function that used when error occurred
OnErrorFunc func(string)
// OnMessageFunc is callback function that receives messages from client
OnMessageFunc func([]byte)
// OnFunc is callback function that particular event which fires when a message to this event received
OnFunc interface{}
)
// Client is interface
type Client interface {
ID() string
HTTPRequest() *http.Request
Conn() Connection
Disconnect() error
OnMessage(OnMessageFunc)
OnError(OnErrorFunc)
OnDisconnect(OnDisconnectFunc)
On(string, OnFunc)
initialize() error
destroy() error
}
type client struct {
id string
status ClientStatus
messageType int
server *server
httpRequest *http.Request
conn Connection
pingTicker *time.Ticker
writeMTX sync.Mutex
onMessageListeners []OnMessageFunc
onErrorListeners []OnErrorFunc
onDisconnectListeners []OnDisconnectFunc
onListeners map[string][]OnFunc
}
var _ Client = &client{}
func newClient(s *server, r *http.Request, conn Connection, clientID string) Client {
c := &client{
id: clientID,
status: CONNECTED,
messageType: gWebsocket.TextMessage,
server: s,
httpRequest: r,
conn: conn,
onMessageListeners: make([]OnMessageFunc, 0),
onErrorListeners: make([]OnErrorFunc, 0),
onDisconnectListeners: make([]OnDisconnectFunc, 0),
onListeners: make(map[string][]OnFunc),
}
if s.options.BinaryMessage {
c.messageType = gWebsocket.BinaryMessage
}
return c
}
func (c *client) ID() string {
return c.id
}
func (c *client) HTTPRequest() *http.Request {
return c.httpRequest
}
func (c *client) Conn() Connection {
return c.conn
}
func (c *client) Disconnect() error {
return c.server.Disconnect(c.ID())
}
func (c *client) OnDisconnect(cb OnDisconnectFunc) {
c.onDisconnectListeners = append(c.onDisconnectListeners, cb)
}
func (c *client) OnError(cb OnErrorFunc) {
c.onErrorListeners = append(c.onErrorListeners, cb)
}
func (c *client) OnMessage(cb OnMessageFunc) {
c.onMessageListeners = append(c.onMessageListeners, cb)
}
func (c *client) On(event string, cb OnFunc) {
if c.onListeners[event] == nil {
c.onListeners[event] = make([]OnFunc, 0)
}
c.onListeners[event] = append(c.onListeners[event], cb)
}
func (c *client) initialize() error {
c.startPing()
}
func (c *client) destroy() error {
c.status = DISCONNECTED
for i := range c.onDisconnectListeners {
c.onDisconnectListeners[i](c)
}
return c.conn.Close()
}
func (c *client) startPing() {
c.pingTicker = time.NewTicker(c.server.options.PingPeriod)
go func() {
defer c.pingTicker.Stop()
for {
<-c.pingTicker.C
if err := c.conn.WriteControl(gWebsocket.PingMessage, []byte{}, time.Now().Add(c.server.options.PingTimeout)); err != nil {
log.Println("ping:", err)
}
}
}()
}
func (c *client) startReading() {
hasReadTimeout := c.server.options.ReadTimeout > 0
c.conn.SetReadLimit(c.server.options.MaxMessageSize)
c.conn.SetPongHandler(func(message string) error {
if hasReadTimeout {
c.conn.SetReadDeadline(time.Now().Add(c.server.options.PongTimeout))
}
return nil
})
defer func() {
c.Disconnect()
}()
for {
if hasReadTimeout {
c.conn.SetReadDeadline(time.Now().Add(c.server.options.ReadTimeout))
}
messageType, data, err := c.conn.ReadMessage()
if err != nil {
if gWebsocket.IsUnexpectedCloseError(err, gWebsocket.CloseGoingAway) {
c.EmitError(err.Error())
}
break
} else {
c.onMessageReceived(messageType, data)
}
}
}
func (c *client) onMessageReceived(messageType int, data []byte) {
}
func (c *client) write(messageType int, data []byte) {
c.writeMTX.Lock()
if writeTimeout := c.server.options.WriteTimeout; writeTimeout > 0 {
// set the write deadline based on the configuration
err := c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
log.Println(fmt.Errorf("%v", err))
}
err := c.conn.WriteMessage(messageType, data)
c.writeMTX.Unlock()
if nil != err {
c.Disconnect()
}
}