From c1de188784138a5ec935a9a0bd20a8322671922a Mon Sep 17 00:00:00 2001 From: crusader Date: Mon, 21 Aug 2017 19:23:45 +0900 Subject: [PATCH] ing --- backend/client.go | 26 +++++++ backend/options.go | 26 +++++++ backend/pool.go | 153 ++++++++++++++++++++++++++++++++++++++++++ clients/client.go | 7 ++ clients/context.go | 7 ++ glide.yaml | 5 ++ pubsub/pubsub.go | 4 +- pubsub/redis/redis.go | 27 +++++++- server.go | 30 +++++++++ 9 files changed, 281 insertions(+), 4 deletions(-) create mode 100644 backend/client.go create mode 100644 backend/options.go create mode 100644 backend/pool.go create mode 100644 clients/client.go create mode 100644 clients/context.go diff --git a/backend/client.go b/backend/client.go new file mode 100644 index 0000000..194d259 --- /dev/null +++ b/backend/client.go @@ -0,0 +1,26 @@ +package backend + +import ( + "context" + + "google.golang.org/grpc" +) + +type BackendClient interface { + Exec(ctx context.Context, in *BackendClientInput, opts ...grpc.CallOption) (*BackendClientOutput, error) +} + +type BackendClientInput struct { + Target string + Method string + Params []string +} + +type BackendClientOutput struct { + Result string +} + +type PooledClient interface { + Exec(ctx context.Context, target string, method string, params []string) (string, error) + Close() +} diff --git a/backend/options.go b/backend/options.go new file mode 100644 index 0000000..d28c793 --- /dev/null +++ b/backend/options.go @@ -0,0 +1,26 @@ +package backend + +import ( + "google.golang.org/grpc" +) + +type Options struct { + // Dial is an application supplied function for creating and configuring a + // grpc connection. + // + // The grpc.ClientConn returned from Dial + Dial func() (*grpc.ClientConn, error) + + // NewClient is an application supplied function for creating and configuring a + // client. + // + // The client returned from NewClient + NewClient func(*grpc.ClientConn) (BackendClient, error) + + // Initial number of clients in the pool. + InitCapacity int + + // Maximum number of clients allocated by the pool at a given time. + // When zero, there is no limit on the number of clients in the pool. + MaxCapacity int +} diff --git a/backend/pool.go b/backend/pool.go new file mode 100644 index 0000000..6ee413e --- /dev/null +++ b/backend/pool.go @@ -0,0 +1,153 @@ +package backend + +import ( + "context" + "fmt" + "sync" + + "google.golang.org/grpc" +) + +type Pool interface { + // Get gets a client. The application must close the returned client. + // This method always returns a valid client so that applications can defer + // error handling to the first use of the client. If there is an error + // getting an underlying client, then the client Err, Do, Send, Flush + // and Receive methods return that error. + Get() (PooledClient, error) + + // Capacity returns the number of maximum clients in the pool. + Capacity() int + + // Available returns the number of avaliable clients in the pool. + Available() int + + // Destroy releases the resources used by the pool. + Destroy() +} + +type pool struct { + dial func() (*grpc.ClientConn, error) + newClient func(*grpc.ClientConn) (BackendClient, error) + initCapacity int + maxCapacity int + + conn *grpc.ClientConn + mtx sync.Mutex + clients chan PooledClient +} + +func NewPool(o Options) (Pool, error) { + if o.Dial == nil { + return nil, fmt.Errorf("invalid Dial settings") + } + + if o.NewClient == nil { + return nil, fmt.Errorf("invalid NewClient settings") + } + + if o.InitCapacity < 0 || o.MaxCapacity < 0 { + return nil, fmt.Errorf("invalid capacity settings") + } + + p := &pool{ + dial: o.Dial, + newClient: o.NewClient, + initCapacity: o.InitCapacity, + maxCapacity: o.MaxCapacity, + clients: make(chan PooledClient, o.InitCapacity), + } + + var err error + p.conn, err = p.dial() + if nil != err { + return nil, err + } + + for i := 0; i < p.initCapacity; i++ { + pc, err := p.create() + if err != nil { + p.conn.Close() + return nil, err + } + p.clients <- pc + } + + return p, nil +} + +func (p *pool) Capacity() int { + return cap(p.clients) +} + +func (p *pool) Available() int { + return len(p.clients) +} + +func (p *pool) Get() (PooledClient, error) { + if p.clients == nil { + // pool aleardy destroyed, returns new client + return p.create() + } + for { + select { + case c := <-p.clients: + return c, nil + default: + return p.create() + } + } +} + +func (p *pool) Destroy() { + p.mtx.Lock() + defer p.mtx.Unlock() + if p.clients == nil { + // pool aleardy destroyed + return + } + close(p.clients) + p.clients = nil + p.conn.Close() +} + +func (p *pool) create() (PooledClient, error) { + c, err := p.newClient(p.conn) + if err != nil { + return nil, err + } + pc := &pooledClient{ + p: p, + c: c, + } + return pc, nil +} + +type pooledClient struct { + p *pool + c BackendClient +} + +func (pc *pooledClient) Exec(ctx context.Context, target string, method string, params []string) (string, error) { + si := &BackendClientInput{ + Target: target, + Method: method, + Params: params, + } + so, err := pc.c.Exec(ctx, si) + if nil != err { + return "", err + } + + return so.Result, nil +} + +func (pc *pooledClient) Close() { + select { + case pc.p.clients <- pc: + return + default: + // pool is full, close passed connection + return + } +} diff --git a/clients/client.go b/clients/client.go new file mode 100644 index 0000000..18c7908 --- /dev/null +++ b/clients/client.go @@ -0,0 +1,7 @@ +package clients + +type Client interface { +} + +type client struct { +} diff --git a/clients/context.go b/clients/context.go new file mode 100644 index 0000000..ab29bab --- /dev/null +++ b/clients/context.go @@ -0,0 +1,7 @@ +package clients + +type Context interface { +} + +type context struct { +} diff --git a/glide.yaml b/glide.yaml index bd02318..e117fa5 100644 --- a/glide.yaml +++ b/glide.yaml @@ -7,3 +7,8 @@ import: version: v1.1.0 subpackages: - redis +- package: google.golang.org/grpc + version: v1.5.2 +- package: git.loafle.net/overflow/overflow_api_server + subpackages: + - golang diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 19fa7a6..6ee42c1 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -4,8 +4,8 @@ type ( OnSubscribeFunc func(channel string, payload string) ) -type Pubsub interface { - Subscribe(cb OnSubscribeFunc) +type PubSub interface { + Subscribe(channel string, cb OnSubscribeFunc) } type pubSub struct { diff --git a/pubsub/redis/redis.go b/pubsub/redis/redis.go index d0945f1..37d47e5 100644 --- a/pubsub/redis/redis.go +++ b/pubsub/redis/redis.go @@ -1,6 +1,8 @@ package redis import ( + "log" + "sync" "time" "git.loafle.net/overflow/overflow_gateway_websocket/pubsub" @@ -9,9 +11,11 @@ import ( type redisPubSub struct { pool *redis.Pool + + mtx sync.Mutex } -func New() pubsub.Pubsub { +func New(addr string) (pubsub.PubSub, error) { r := &redisPubSub{} r.pool = &redis.Pool{ MaxIdle: 3, @@ -20,5 +24,24 @@ func New() pubsub.Pubsub { return redis.Dial("tcp", addr) }, } - return r + return r, nil +} + +func (r *redisPubSub) Subscribe(channel string, cb pubsub.OnSubscribeFunc) { + conn := r.pool.Get() + defer conn.Close() + psc := redis.PubSubConn{Conn: conn} + psc.Subscribe(channel) + for { + switch v := psc.Receive().(type) { + case redis.Message: + log.Printf("message: %s: %s\n", v.Channel, string(v.Data)) + case redis.Subscription: + log.Printf("subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count) + case error: + log.Println("error pub/sub, delivery has stopped") + return + default: + } + } } diff --git a/server.go b/server.go index 2f0bf70..25855ac 100644 --- a/server.go +++ b/server.go @@ -1,5 +1,12 @@ package overflow_gateway_websocket +import ( + "git.loafle.net/overflow/overflow_gateway_websocket/backend" + "git.loafle.net/overflow/overflow_gateway_websocket/pubsub" + "git.loafle.net/overflow/overflow_gateway_websocket/pubsub/redis" + "google.golang.org/grpc" +) + type ( // OnPreRequestFunc is callback function OnPreRequestFunc func() @@ -21,6 +28,9 @@ type server struct { onPostRequestListeners []OnPostRequestFunc onPreNotifyListeners []OnPreNotifyFunc onPrePushListeners []OnPrePushFunc + + backendPool backend.Pool + pubSub pubsub.PubSub } func NewServer() Server { @@ -30,6 +40,26 @@ func NewServer() Server { onPreNotifyListeners: make([]OnPreNotifyFunc, 0), onPrePushListeners: make([]OnPrePushFunc, 0), } + + var err error + s.backendPool, err = backend.NewPool(backend.Options{ + Dial: func() (*grpc.ClientConn, error) { + return nil, nil + }, + NewClient: func(*grpc.ClientConn) (backend.BackendClient, error) { + return nil, nil + }, + }) + + if nil != err { + + } + + s.pubSub, err = redis.New("") + if nil != err { + + } + return s }