diff --git a/glide.yaml b/glide.yaml index fb75d63..6960547 100644 --- a/glide.yaml +++ b/glide.yaml @@ -6,6 +6,13 @@ import: version: ^1.2.0 - package: google.golang.org/grpc version: ^1.4.2 + subpackages: + - health/grpc_health_v1 - package: golang.org/x/net subpackages: - websocket +- package: github.com/gorilla/rpc + version: ^1.1.0 +- package: git.loafle.net/overflow/overflow_api_server + subpackages: + - golang diff --git a/main.go b/main.go index 32199c8..1c1bb02 100644 --- a/main.go +++ b/main.go @@ -1,11 +1,10 @@ package main import ( - "log" + "net/http" - "google.golang.org/grpc" - - "git.loafle.net/overflow/overflow_websocket_service/service" + "git.loafle.net/overflow/overflow_service_websocket/protocol/jsonrpc" + "git.loafle.net/overflow/overflow_service_websocket/server" ) var ( @@ -13,38 +12,17 @@ var ( ) func main() { - connPool, err := service.New(10, 30, func() (*grpc.ClientConn, error) { - return grpc.Dial(*contentGrpcAddr, grpc.WithDialer(dialer), grpc.WithInsecure()) - }) - if err != nil { - log.Fatalf("create service connection pool error: %v\n", err) - } - defer connPool.Destroy() - connPool.Ping = func(conn *grpc.ClientConn) bool { - // check connection status - return true - } - connPool.Close = func(conn *grpc.ClientConn) { - // close connection - conn.Close() - } + ws := server.New(server.Options{}) + ws.RegistProtocol("jsonrpc", jsonrpc.NewHandler()) + ws.OnConnection(func(c server.Client) { + c.OnDisconnect(func(Client) { - // ws := websocket.New(websocket.Config{}) - // http.HandleFunc("/ws", ws.Handler()) + }) + }) + + http.HandleFunc("/ws", ws.HTTPHandler()) // ws.OnConnection(handleWebsocketConnection) - // http.ListenAndServe(host, nil) -} - -func handleWebsocketConnection(c websocket.Connection) { -} - -func callGrpcService(connPool *service.Pool) { - conn, err := connPool.Get() - if err != nil { - log.Printf("get connection error: %v\n", err) - } - // * Important - defer connPool.Put(conn) + http.ListenAndServe(host, nil) } diff --git a/pool/grpc/service.go b/pool/grpc/service.go new file mode 100644 index 0000000..e33b58c --- /dev/null +++ b/pool/grpc/service.go @@ -0,0 +1,167 @@ +package grpc + +import ( + "fmt" + "log" + "sync" + + "git.loafle.net/overflow/overflow_service_websocket/pool" + "google.golang.org/grpc" +) + +type ClientCreator func(*grpc.ClientConn) (interface{}, error) +type ConnectionCreator func() (*grpc.ClientConn, error) + +type grpcPool struct { + creator ClientCreator + connectionCreator ConnectionCreator + clients chan interface{} + connections map[interface{}]*grpc.ClientConn + mtx sync.Mutex + initCapacity int + maxCapacity int +} + +func New(initCap int, maxCap int, creator ClientCreator, connectionCreator ConnectionCreator) (pool.Pool, error) { + if initCap < 0 || maxCap <= 0 || initCap > maxCap { + return nil, fmt.Errorf("invalid capacity settings") + } + + if creator == nil { + return nil, fmt.Errorf("invalid ClientCreator settings") + } + if connectionCreator == nil { + return nil, fmt.Errorf("invalid ConnectionCreator settings") + } + + p := &grpcPool{ + initCapacity: initCap, + maxCapacity: maxCap, + clients: make(chan interface{}, initCap), + connections: make(map[interface{}]*grpc.ClientConn, initCap), + creator: creator, + connectionCreator: connectionCreator, + } + + for i := 0; i < initCap; i++ { + c, err := p.create() + if err != nil { + return p, err + } + p.clients <- c + } + return p, nil +} + +func (p *grpcPool) Capacity() int { + return cap(p.clients) +} + +func (p *grpcPool) Available() int { + return len(p.clients) +} + +func (p *grpcPool) Get() (interface{}, error) { + if p.clients == nil { + // pool aleardy destroyed, returns new client + return p.create() + } + for { + select { + case c := <-p.clients: + return c, nil + default: + return p.create() + } + } +} + +func (p *grpcPool) Put(c interface{}) { + select { + case p.clients <- c: + return + default: + // pool is full, close passed connection + p.destroy(c) + return + } +} + +func (p *grpcPool) Destroy() { + p.mtx.Lock() + defer p.mtx.Unlock() + if p.clients == nil { + // pool aleardy destroyed + return + } + close(p.clients) + for c := range p.clients { + p.destroy(c) + } + p.clients = nil +} + +func (p *grpcPool) putConnection(c interface{}, conn *grpc.ClientConn) error { + if nil != p.connections[c] { + return fmt.Errorf("Connection alread exist") + } + p.connections[c] = conn + return nil +} + +func (p *grpcPool) removeConnection(c interface{}) (*grpc.ClientConn, error) { + if nil == p.connections[c] { + return nil, fmt.Errorf("Connection is not exist") + } + conn := p.connections[c] + delete(p.connections, conn) + return conn, nil +} + +func (p *grpcPool) create() (interface{}, error) { + if p.creator == nil { + return nil, fmt.Errorf("ClientCreator is nil, can not create client") + } + + var err error + + conn, err := p.connectionCreate() + if nil != err { + return nil, err + } + + c, err := p.creator(conn) + if nil != err { + err = conn.Close() + return nil, err + } + + err = p.putConnection(c, conn) + if nil != err { + err = conn.Close() + return nil, err + } + + return c, nil +} + +func (p *grpcPool) destroy(c interface{}) { + + conn, err := p.removeConnection(c) + if nil != err { + log.Println(fmt.Errorf("%v", err)) + return + } + err = conn.Close() + if nil != err { + log.Println(fmt.Errorf("%v", err)) + return + } +} + +func (p *grpcPool) connectionCreate() (*grpc.ClientConn, error) { + if p.connectionCreator == nil { + return nil, fmt.Errorf("ConnectionCreator is nil, can not create connection") + } + return p.connectionCreator() +} diff --git a/pool/pool.go b/pool/pool.go new file mode 100644 index 0000000..8b51b2c --- /dev/null +++ b/pool/pool.go @@ -0,0 +1,9 @@ +package pool + +type Pool interface { + Get() (interface{}, error) + Put(interface{}) + Destroy() + Capacity() int + Available() int +} diff --git a/protocol/hadler.go b/protocol/hadler.go new file mode 100644 index 0000000..3a2f4aa --- /dev/null +++ b/protocol/hadler.go @@ -0,0 +1,35 @@ +package protocol + +import "errors" + +type ErrorCode int + +const ( + E_PARSE ErrorCode = -32700 + E_INVALID_REQ ErrorCode = -32600 + E_NO_METHOD ErrorCode = -32601 + E_BAD_PARAMS ErrorCode = -32602 + E_INTERNAL ErrorCode = -32603 + E_SERVER ErrorCode = -32000 +) + +var ErrNullResult = errors.New("result is null") + +type Error struct { + Code ErrorCode `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` +} + +type Handler interface { + Handle([]byte) ([]byte, *Error) +} + +func NewError(code ErrorCode, err error, data interface{}) *Error { + e := &Error{ + Code: code, + Message: err.Error(), + Data: data, + } + return e +} diff --git a/protocol/jsonrpc/rpc.go b/protocol/jsonrpc/rpc.go new file mode 100644 index 0000000..4949d9d --- /dev/null +++ b/protocol/jsonrpc/rpc.go @@ -0,0 +1,94 @@ +package jsonrpc + +import ( + "encoding/json" + "log" + "strings" + + "golang.org/x/net/context" + "google.golang.org/grpc" + + serverGrpc "git.loafle.net/overflow/overflow_api_server/golang" + "git.loafle.net/overflow/overflow_service_websocket/pool" + grpcPool "git.loafle.net/overflow/overflow_service_websocket/pool/grpc" + "git.loafle.net/overflow/overflow_service_websocket/protocol" +) + +var ( + host = "localhost:8080" +) + +type Request struct { + Method string `json:"method"` + Params []string `json:"params,omitempty"` +} + +type Response struct { + Result string `json:"result,omitempty"` +} + +// NewHandler returns a new JSON RPC handler. +func NewHandler() protocol.Handler { + servicePool, err := grpcPool.New(10, 30, + func(conn *grpc.ClientConn) (interface{}, error) { + return serverGrpc.NewOverflowApiServerClient(conn), nil + }, + func() (*grpc.ClientConn, error) { + return grpc.Dial(host, grpc.WithInsecure()) + }, + ) + if nil != err { + log.Fatal(err) + return nil + } + h := &handler{ + pool: servicePool, + } + + return h +} + +// Codec creates a CodecRequest to process each request. +type handler struct { + pool pool.Pool +} + +// NewRequest returns a CodecRequest. +func (h *handler) Handle(data []byte) ([]byte, *protocol.Error) { + req := new(Request) + err := json.Unmarshal(data, &req) + if nil != err { + return nil, protocol.NewError(protocol.E_PARSE, err, nil) + } + + parts := strings.Split(req.Method, ".") + + si := &serverGrpc.ServerInput{ + Target: parts[0], + Method: parts[1], + Params: req.Params, + } + + c, err := h.pool.Get() + if nil != err { + return nil, protocol.NewError(protocol.E_INTERNAL, err, nil) + } + defer h.pool.Put(c) + client := c.(serverGrpc.OverflowApiServerClient) + out, err := client.Exec(context.Background(), si) + if err != nil { + return nil, protocol.NewError(protocol.E_SERVER, err, err) + } + + res := &Response{ + ID: req.ID, + Result: out.Result, + } + + r, err := json.Marshal(res) + if err != nil { + return nil, protocol.NewError(protocol.E_INTERNAL, err, nil) + } + + return r, nil +} diff --git a/server/client.go b/server/client.go new file mode 100644 index 0000000..1248455 --- /dev/null +++ b/server/client.go @@ -0,0 +1,243 @@ +package server + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "sync" + "time" + + "git.loafle.net/overflow/overflow_service_websocket/protocol" + "github.com/gorilla/websocket" +) + +type ClientStatus uint8 + +const ( + CONNECTED ClientStatus = iota + 1 + DISCONNECTED +) + +type ( + // OnDisconnectFunc is callback function that used when client is disconnected + OnDisconnectFunc func(Client) + // OnErrorFunc is callback function that used when error occurred + OnErrorFunc func(error) +) + +// Client is interface +type Client interface { + ID() string + HTTPRequest() *http.Request + Conn() Connection + Disconnect() error + OnDisconnect(OnDisconnectFunc) + OnError(OnErrorFunc) + initialize() error + destroy() error +} + +type client struct { + id string + status ClientStatus + messageType int + server Server + httpRequest *http.Request + conn Connection + pingTicker *time.Ticker + writeMTX sync.Mutex + onDisconnectListeners []OnDisconnectFunc + onErrorListeners []OnErrorFunc +} + +var _ Client = &client{} + +func newClient(s Server, r *http.Request, conn Connection, clientID string) Client { + c := &client{ + id: clientID, + status: CONNECTED, + messageType: websocket.TextMessage, + server: s, + httpRequest: r, + conn: conn, + onDisconnectListeners: make([]OnDisconnectFunc, 0), + onErrorListeners: make([]OnErrorFunc, 0), + } + + if s.GetOptions().BinaryMessage { + c.messageType = websocket.BinaryMessage + } + + return c +} + +func (c *client) ID() string { + return c.id +} + +func (c *client) HTTPRequest() *http.Request { + return c.httpRequest +} + +func (c *client) Conn() Connection { + return c.conn +} + +func (c *client) Disconnect() error { + return c.server.Disconnect(c.ID()) +} + +func (c *client) OnDisconnect(cb OnDisconnectFunc) { + c.onDisconnectListeners = append(c.onDisconnectListeners, cb) +} + +func (c *client) OnError(cb OnErrorFunc) { + c.onErrorListeners = append(c.onErrorListeners, cb) +} + +func (c *client) initialize() error { + c.status = CONNECTED + c.startPing() + c.startReading() + return nil +} + +func (c *client) destroy() error { + c.status = DISCONNECTED + c.pingTicker.Stop() + + for _, cb := range c.onDisconnectListeners { + cb(c) + } + + return c.conn.Close() +} + +func (c *client) startPing() { + c.pingTicker = time.NewTicker(c.server.GetOptions().PingPeriod) + go func() { + for { + <-c.pingTicker.C + if err := c.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(c.server.GetOptions().PingTimeout)); err != nil { + log.Println("ping:", err) + } + } + }() +} + +func (c *client) startReading() { + hasReadTimeout := c.server.GetOptions().ReadTimeout > 0 + c.conn.SetReadLimit(c.server.GetOptions().MaxMessageSize) + c.conn.SetPongHandler(func(message string) error { + if hasReadTimeout { + c.conn.SetReadDeadline(time.Now().Add(c.server.GetOptions().PongTimeout)) + } + + return nil + }) + + defer func() { + c.Disconnect() + }() + + for { + if hasReadTimeout { + c.conn.SetReadDeadline(time.Now().Add(c.server.GetOptions().ReadTimeout)) + } + + // messageType, data, err := c.conn.ReadMessage() + messageType, r, err := c.conn.NextReader() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { + c.fireError(err) + } + break + } else { + c.onMessageReceived(messageType, r) + } + + } +} + +func (c *client) onMessageReceived(messageType int, r io.Reader) { + req := new(Request) + err := json.NewDecoder(r).Decode(req) + if err != nil { + } + + h := c.server.ProtocolHandler(req.Protocol) + if nil == h { + + } + + hr, perr := h.Handle(*req.Body) + if perr != nil { + c.writeError(req, perr) + } + + c.writeResult(req, hr) +} + +func (c *client) fireError(err error) { + for _, cb := range c.onErrorListeners { + cb(err) + } +} + +func (c *client) writeResult(r *Request, data []byte) { + c.writeMTX.Lock() + if writeTimeout := c.server.GetOptions().WriteTimeout; writeTimeout > 0 { + // set the write deadline based on the configuration + err := c.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) + log.Println(fmt.Errorf("%v", err)) + } + + raw := json.RawMessage(data) + + res := &Response{ + Protocol: r.Protocol, + ID: r.ID, + Body: &raw, + Error: nil, + } + + jRes, err := json.Marshal(res) + if nil != err { + } + + err = c.conn.WriteMessage(c.messageType, jRes) + c.writeMTX.Unlock() + + if nil != err { + c.Disconnect() + } +} + +func (c *client) writeError(r *Request, perr *protocol.Error) { + c.writeMTX.Lock() + if writeTimeout := c.server.GetOptions().WriteTimeout; writeTimeout > 0 { + // set the write deadline based on the configuration + err := c.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) + log.Println(fmt.Errorf("%v", err)) + } + + res := &Response{ + Protocol: r.Protocol, + ID: r.ID, + Body: nil, + Error: perr, + } + + jRes, err := json.Marshal(res) + if nil != err { + } + + err = c.conn.WriteMessage(c.messageType, jRes) + c.writeMTX.Unlock() + + if nil != err { + c.Disconnect() + } +} diff --git a/websocket/connection.go b/server/connection.go similarity index 98% rename from websocket/connection.go rename to server/connection.go index c514de7..cfc59b2 100644 --- a/websocket/connection.go +++ b/server/connection.go @@ -1,4 +1,4 @@ -package websocket +package server import ( "io" diff --git a/websocket/options.go b/server/options.go similarity index 87% rename from websocket/options.go rename to server/options.go index 2086c12..1468c09 100644 --- a/websocket/options.go +++ b/server/options.go @@ -1,4 +1,4 @@ -package websocket +package server import ( "net/http" @@ -49,8 +49,8 @@ func (os OptionSet) Set(o *Options) { // Options is configuration of the websocket server type Options struct { - Error func(res http.ResponseWriter, req *http.Request, status int, reason error) - CheckOrigin func(req *http.Request) bool + OnError func(res http.ResponseWriter, req *http.Request, status int, reason error) + OnCheckOrigin func(req *http.Request) bool WriteTimeout time.Duration ReadTimeout time.Duration PongTimeout time.Duration @@ -65,8 +65,8 @@ type Options struct { // Set is the func which makes the OptionSet an OptionSetter, this is used mostly func (o *Options) Set(main *Options) { - main.Error = o.Error - main.CheckOrigin = o.CheckOrigin + main.OnError = o.OnError + main.OnCheckOrigin = o.OnCheckOrigin main.WriteTimeout = o.WriteTimeout main.ReadTimeout = o.ReadTimeout main.PongTimeout = o.PongTimeout @@ -79,18 +79,18 @@ func (o *Options) Set(main *Options) { main.IDGenerator = o.IDGenerator } -// Error sets the error handler -func Error(val func(res http.ResponseWriter, req *http.Request, status int, reason error)) OptionSet { +// OnError sets the error handler +func OnError(val func(res http.ResponseWriter, req *http.Request, status int, reason error)) OptionSet { return func(o *Options) { - o.Error = val + o.OnError = val } } -// CheckOrigin sets a handler which will check if different origin(domains) are allowed to contact with +// OnCheckOrigin sets a handler which will check if different origin(domains) are allowed to contact with // the websocket server -func CheckOrigin(val func(req *http.Request) bool) OptionSet { +func OnCheckOrigin(val func(req *http.Request) bool) OptionSet { return func(o *Options) { - o.CheckOrigin = val + o.OnCheckOrigin = val } } @@ -207,13 +207,13 @@ func (o *Options) Validate() { o.WriteBufferSize = DefaultWriteBufferSize } - if o.Error == nil { - o.Error = func(res http.ResponseWriter, req *http.Request, status int, reason error) { + if o.OnError == nil { + o.OnError = func(res http.ResponseWriter, req *http.Request, status int, reason error) { } } - if o.CheckOrigin == nil { - o.CheckOrigin = func(req *http.Request) bool { + if o.OnCheckOrigin == nil { + o.OnCheckOrigin = func(req *http.Request) bool { return true } } diff --git a/server/protocol.go b/server/protocol.go new file mode 100644 index 0000000..66b16fe --- /dev/null +++ b/server/protocol.go @@ -0,0 +1,20 @@ +package server + +import ( + "encoding/json" + + "git.loafle.net/overflow/overflow_service_websocket/protocol" +) + +type Request struct { + Protocol string `json:"protocol"` + ID *json.RawMessage `json:"id"` + Body *json.RawMessage `json:"body"` +} + +type Response struct { + Protocol string `json:"protocol"` + ID *json.RawMessage `json:"id"` + Error *protocol.Error `json:"error,omitempty"` + Body *json.RawMessage `json:"body"` +} diff --git a/websocket/server.go b/server/server.go similarity index 66% rename from websocket/server.go rename to server/server.go index 4f083be..4f7c75e 100644 --- a/websocket/server.go +++ b/server/server.go @@ -1,11 +1,13 @@ -package websocket +package server import ( "fmt" "log" "net/http" + "strings" "sync" + "git.loafle.net/overflow/overflow_service_websocket/protocol" "github.com/gorilla/websocket" ) @@ -17,12 +19,15 @@ type ( // Server is the websocket server, // listens on the config's port, the critical part is the event OnConnection type Server interface { - Set(...OptionSetter) - Handler() http.Handler + SetOptions(...OptionSetter) + GetOptions() *Options + RegistProtocol(protocol string, h protocol.Handler) + ProtocolHandler(protocol string) protocol.Handler + HTTPHandler() http.Handler HandleConnection(*http.Request, Connection) OnConnection(cb OnConnectionFunc) IsConnected(clientID string) bool - GetSocket(clientID string) Client + GetClient(clientID string) Client Disconnect(clientID string) error } @@ -31,6 +36,7 @@ type server struct { clients map[string]Client clientMTX sync.Mutex onConnectionListeners []OnConnectionFunc + protocols map[string]protocol.Handler } var _ Server = &server{} @@ -50,18 +56,35 @@ func newServer(setters ...OptionSetter) Server { s := &server{ clients: make(map[string]Client, 100), onConnectionListeners: make([]OnConnectionFunc, 0), + protocols: make(map[string]protocol.Handler, 0), } - s.Set(setters...) + s.SetOptions(setters...) return s } -// Set is function that set option values -func Set(setters ...OptionSetter) { - defaultServer.Set(setters...) +func RegistProtocol(protocol string, h protocol.Handler) { + defaultServer.RegistProtocol(protocol, h) } -func (s *server) Set(setters ...OptionSetter) { +func (s *server) RegistProtocol(protocol string, h protocol.Handler) { + s.protocols[strings.ToLower(protocol)] = h +} + +func ProtocolHandler(protocol string) protocol.Handler { + return defaultServer.ProtocolHandler(protocol) +} + +func (s *server) ProtocolHandler(protocol string) protocol.Handler { + return s.protocols[protocol] +} + +// SetOptions is function that set option values +func SetOptions(setters ...OptionSetter) { + defaultServer.SetOptions(setters...) +} + +func (s *server) SetOptions(setters ...OptionSetter) { for _, setter := range setters { setter.Set(s.options) } @@ -69,19 +92,28 @@ func (s *server) Set(setters ...OptionSetter) { s.options.Validate() } -// Handler is the function that used on http request -func Handler() http.Handler { - return defaultServer.Handler() +// GetOptions is function that get option values +func GetOptions() *Options { + return defaultServer.GetOptions() } -func (s *server) Handler() http.Handler { +func (s *server) GetOptions() *Options { + return s.options +} + +// Handler is the function that used on http request +func HTTPHandler() http.Handler { + return defaultServer.HTTPHandler() +} + +func (s *server) HTTPHandler() http.Handler { o := s.options upgrader := websocket.Upgrader{ ReadBufferSize: o.ReadBufferSize, WriteBufferSize: o.WriteBufferSize, - Error: o.Error, - CheckOrigin: o.CheckOrigin, + Error: o.OnError, + CheckOrigin: o.OnCheckOrigin, } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -110,6 +142,8 @@ func (s *server) HandleConnection(r *http.Request, conn Connection) { for i := range s.onConnectionListeners { s.onConnectionListeners[i](c) } + + c.initialize() } // OnConnection is function that add the callback when client is connected to default Server @@ -131,12 +165,12 @@ func (s *server) IsConnected(clientID string) bool { return soc != nil } -// GetSocket is function that return client instance -func GetSocket(clientID string) Client { - return defaultServer.GetSocket(clientID) +// GetClient is function that return client instance +func GetClient(clientID string) Client { + return defaultServer.GetClient(clientID) } -func (s *server) GetSocket(clientID string) Client { +func (s *server) GetClient(clientID string) Client { return s.clients[clientID] } diff --git a/service/pool.go b/service/pool.go deleted file mode 100644 index 391fc0c..0000000 --- a/service/pool.go +++ /dev/null @@ -1,104 +0,0 @@ -package service - -import ( - "fmt" - "sync" - - grpc "google.golang.org/grpc" -) - -type ConnectionGenerator func() (*grpc.ClientConn, error) -type ConnectionChecker func(*grpc.ClientConn) bool -type ConnectionCloser func(*grpc.ClientConn) - -type Pool struct { - New ConnectionGenerator - Ping ConnectionChecker - Close ConnectionCloser - Get func() (*grpc.ClientConn, error) - Put func(conn *grpc.ClientConn) - Destroy func() - store chan *grpc.ClientConn - mtx sync.Mutex - initCapacity int - maxCapacity int -} - -func New(initCap int, maxCap int, connector ConnectionGenerator) (*Pool, error) { - if initCap < 0 || maxCap <= 0 || initCap > maxCap { - return nil, fmt.Errorf("invalid capacity settings") - } - p := new(Pool) - p.store = make(chan *grpc.ClientConn, maxCap) - - if connector != nil { - p.New = connector - } - - for i := 0; i < initCap; i++ { - conn, err := p.create() - if err != nil { - return p, err - } - p.store <- conn - } - return p, nil -} - -func (p *Pool) Len() int { - return len(p.store) -} - -func (p *Pool) Get() (*grpc.ClientConn, error) { - if p.store == nil { - // pool aleardy destroyed, returns new connection - return p.create() - } - for { - select { - case conn := <-p.store: - if p.Ping != nil && p.Ping(conn) == false { - continue - } - return conn, nil - default: - return p.create() - } - } -} - -func (p *Pool) Put(conn *grpc.ClientConn) { - select { - case p.store <- conn: - return - default: - // pool is full, close passed connection - if p.Close != nil { - p.Close(conn) - } - return - } -} - -func (p *Pool) Destroy() { - p.mtx.Lock() - defer p.mtx.Unlock() - if p.store == nil { - // pool aleardy destroyed - return - } - close(p.store) - for conn := range p.store { - if p.Close != nil { - p.Close(conn) - } - } - p.store = nil -} - -func (p *Pool) create() (*grpc.ClientConn, error) { - if p.New == nil { - return nil, fmt.Errorf("Pool.New is nil, can not create connection") - } - return p.New() -} diff --git a/websocket/client.go b/websocket/client.go deleted file mode 100644 index d6861bf..0000000 --- a/websocket/client.go +++ /dev/null @@ -1,195 +0,0 @@ -package websocket - -import ( - "fmt" - "log" - "net/http" - "sync" - "time" - - gWebsocket "github.com/gorilla/websocket" -) - -type ClientStatus uint8 - -const ( - CONNECTED ClientStatus = iota + 1 - DISCONNECTED -) - -type ( - // OnDisconnectFunc is callback function that used when client is disconnected - OnDisconnectFunc func(Client) - // OnErrorFunc is callback function that used when error occurred - OnErrorFunc func(string) - // OnMessageFunc is callback function that receives messages from client - OnMessageFunc func([]byte) - // OnFunc is callback function that particular event which fires when a message to this event received - OnFunc interface{} -) - -// Client is interface -type Client interface { - ID() string - HTTPRequest() *http.Request - Conn() Connection - Disconnect() error - OnMessage(OnMessageFunc) - OnError(OnErrorFunc) - OnDisconnect(OnDisconnectFunc) - On(string, OnFunc) - initialize() error - destroy() error -} - -type client struct { - id string - status ClientStatus - messageType int - server *server - httpRequest *http.Request - conn Connection - pingTicker *time.Ticker - writeMTX sync.Mutex - onMessageListeners []OnMessageFunc - onErrorListeners []OnErrorFunc - onDisconnectListeners []OnDisconnectFunc - onListeners map[string][]OnFunc -} - -var _ Client = &client{} - -func newClient(s *server, r *http.Request, conn Connection, clientID string) Client { - c := &client{ - id: clientID, - status: CONNECTED, - messageType: gWebsocket.TextMessage, - server: s, - httpRequest: r, - conn: conn, - onMessageListeners: make([]OnMessageFunc, 0), - onErrorListeners: make([]OnErrorFunc, 0), - onDisconnectListeners: make([]OnDisconnectFunc, 0), - onListeners: make(map[string][]OnFunc), - } - - if s.options.BinaryMessage { - c.messageType = gWebsocket.BinaryMessage - } - - return c -} - -func (c *client) ID() string { - return c.id -} - -func (c *client) HTTPRequest() *http.Request { - return c.httpRequest -} - -func (c *client) Conn() Connection { - return c.conn -} - -func (c *client) Disconnect() error { - return c.server.Disconnect(c.ID()) -} - -func (c *client) OnDisconnect(cb OnDisconnectFunc) { - c.onDisconnectListeners = append(c.onDisconnectListeners, cb) -} - -func (c *client) OnError(cb OnErrorFunc) { - c.onErrorListeners = append(c.onErrorListeners, cb) -} - -func (c *client) OnMessage(cb OnMessageFunc) { - c.onMessageListeners = append(c.onMessageListeners, cb) -} - -func (c *client) On(event string, cb OnFunc) { - if c.onListeners[event] == nil { - c.onListeners[event] = make([]OnFunc, 0) - } - - c.onListeners[event] = append(c.onListeners[event], cb) -} - -func (c *client) initialize() error { - c.startPing() -} - -func (c *client) destroy() error { - c.status = DISCONNECTED - - for i := range c.onDisconnectListeners { - c.onDisconnectListeners[i](c) - } - - return c.conn.Close() -} - -func (c *client) startPing() { - c.pingTicker = time.NewTicker(c.server.options.PingPeriod) - go func() { - defer c.pingTicker.Stop() - for { - <-c.pingTicker.C - if err := c.conn.WriteControl(gWebsocket.PingMessage, []byte{}, time.Now().Add(c.server.options.PingTimeout)); err != nil { - log.Println("ping:", err) - } - } - }() -} - -func (c *client) startReading() { - hasReadTimeout := c.server.options.ReadTimeout > 0 - c.conn.SetReadLimit(c.server.options.MaxMessageSize) - c.conn.SetPongHandler(func(message string) error { - if hasReadTimeout { - c.conn.SetReadDeadline(time.Now().Add(c.server.options.PongTimeout)) - } - - return nil - }) - - defer func() { - c.Disconnect() - }() - - for { - if hasReadTimeout { - c.conn.SetReadDeadline(time.Now().Add(c.server.options.ReadTimeout)) - } - messageType, data, err := c.conn.ReadMessage() - if err != nil { - if gWebsocket.IsUnexpectedCloseError(err, gWebsocket.CloseGoingAway) { - c.EmitError(err.Error()) - } - break - } else { - c.onMessageReceived(messageType, data) - } - - } -} - -func (c *client) onMessageReceived(messageType int, data []byte) { -} - -func (c *client) write(messageType int, data []byte) { - c.writeMTX.Lock() - if writeTimeout := c.server.options.WriteTimeout; writeTimeout > 0 { - // set the write deadline based on the configuration - err := c.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) - log.Println(fmt.Errorf("%v", err)) - } - - err := c.conn.WriteMessage(messageType, data) - c.writeMTX.Unlock() - - if nil != err { - c.Disconnect() - } -}