This commit is contained in:
crusader 2017-08-21 19:23:45 +09:00
parent 20eed64a96
commit c1de188784
9 changed files with 281 additions and 4 deletions

26
backend/client.go Normal file
View File

@ -0,0 +1,26 @@
package backend
import (
"context"
"google.golang.org/grpc"
)
type BackendClient interface {
Exec(ctx context.Context, in *BackendClientInput, opts ...grpc.CallOption) (*BackendClientOutput, error)
}
type BackendClientInput struct {
Target string
Method string
Params []string
}
type BackendClientOutput struct {
Result string
}
type PooledClient interface {
Exec(ctx context.Context, target string, method string, params []string) (string, error)
Close()
}

26
backend/options.go Normal file
View File

@ -0,0 +1,26 @@
package backend
import (
"google.golang.org/grpc"
)
type Options struct {
// Dial is an application supplied function for creating and configuring a
// grpc connection.
//
// The grpc.ClientConn returned from Dial
Dial func() (*grpc.ClientConn, error)
// NewClient is an application supplied function for creating and configuring a
// client.
//
// The client returned from NewClient
NewClient func(*grpc.ClientConn) (BackendClient, error)
// Initial number of clients in the pool.
InitCapacity int
// Maximum number of clients allocated by the pool at a given time.
// When zero, there is no limit on the number of clients in the pool.
MaxCapacity int
}

153
backend/pool.go Normal file
View File

@ -0,0 +1,153 @@
package backend
import (
"context"
"fmt"
"sync"
"google.golang.org/grpc"
)
type Pool interface {
// Get gets a client. The application must close the returned client.
// This method always returns a valid client so that applications can defer
// error handling to the first use of the client. If there is an error
// getting an underlying client, then the client Err, Do, Send, Flush
// and Receive methods return that error.
Get() (PooledClient, error)
// Capacity returns the number of maximum clients in the pool.
Capacity() int
// Available returns the number of avaliable clients in the pool.
Available() int
// Destroy releases the resources used by the pool.
Destroy()
}
type pool struct {
dial func() (*grpc.ClientConn, error)
newClient func(*grpc.ClientConn) (BackendClient, error)
initCapacity int
maxCapacity int
conn *grpc.ClientConn
mtx sync.Mutex
clients chan PooledClient
}
func NewPool(o Options) (Pool, error) {
if o.Dial == nil {
return nil, fmt.Errorf("invalid Dial settings")
}
if o.NewClient == nil {
return nil, fmt.Errorf("invalid NewClient settings")
}
if o.InitCapacity < 0 || o.MaxCapacity < 0 {
return nil, fmt.Errorf("invalid capacity settings")
}
p := &pool{
dial: o.Dial,
newClient: o.NewClient,
initCapacity: o.InitCapacity,
maxCapacity: o.MaxCapacity,
clients: make(chan PooledClient, o.InitCapacity),
}
var err error
p.conn, err = p.dial()
if nil != err {
return nil, err
}
for i := 0; i < p.initCapacity; i++ {
pc, err := p.create()
if err != nil {
p.conn.Close()
return nil, err
}
p.clients <- pc
}
return p, nil
}
func (p *pool) Capacity() int {
return cap(p.clients)
}
func (p *pool) Available() int {
return len(p.clients)
}
func (p *pool) Get() (PooledClient, error) {
if p.clients == nil {
// pool aleardy destroyed, returns new client
return p.create()
}
for {
select {
case c := <-p.clients:
return c, nil
default:
return p.create()
}
}
}
func (p *pool) Destroy() {
p.mtx.Lock()
defer p.mtx.Unlock()
if p.clients == nil {
// pool aleardy destroyed
return
}
close(p.clients)
p.clients = nil
p.conn.Close()
}
func (p *pool) create() (PooledClient, error) {
c, err := p.newClient(p.conn)
if err != nil {
return nil, err
}
pc := &pooledClient{
p: p,
c: c,
}
return pc, nil
}
type pooledClient struct {
p *pool
c BackendClient
}
func (pc *pooledClient) Exec(ctx context.Context, target string, method string, params []string) (string, error) {
si := &BackendClientInput{
Target: target,
Method: method,
Params: params,
}
so, err := pc.c.Exec(ctx, si)
if nil != err {
return "", err
}
return so.Result, nil
}
func (pc *pooledClient) Close() {
select {
case pc.p.clients <- pc:
return
default:
// pool is full, close passed connection
return
}
}

7
clients/client.go Normal file
View File

@ -0,0 +1,7 @@
package clients
type Client interface {
}
type client struct {
}

7
clients/context.go Normal file
View File

@ -0,0 +1,7 @@
package clients
type Context interface {
}
type context struct {
}

View File

@ -7,3 +7,8 @@ import:
version: v1.1.0
subpackages:
- redis
- package: google.golang.org/grpc
version: v1.5.2
- package: git.loafle.net/overflow/overflow_api_server
subpackages:
- golang

View File

@ -4,8 +4,8 @@ type (
OnSubscribeFunc func(channel string, payload string)
)
type Pubsub interface {
Subscribe(cb OnSubscribeFunc)
type PubSub interface {
Subscribe(channel string, cb OnSubscribeFunc)
}
type pubSub struct {

View File

@ -1,6 +1,8 @@
package redis
import (
"log"
"sync"
"time"
"git.loafle.net/overflow/overflow_gateway_websocket/pubsub"
@ -9,9 +11,11 @@ import (
type redisPubSub struct {
pool *redis.Pool
mtx sync.Mutex
}
func New() pubsub.Pubsub {
func New(addr string) (pubsub.PubSub, error) {
r := &redisPubSub{}
r.pool = &redis.Pool{
MaxIdle: 3,
@ -20,5 +24,24 @@ func New() pubsub.Pubsub {
return redis.Dial("tcp", addr)
},
}
return r
return r, nil
}
func (r *redisPubSub) Subscribe(channel string, cb pubsub.OnSubscribeFunc) {
conn := r.pool.Get()
defer conn.Close()
psc := redis.PubSubConn{Conn: conn}
psc.Subscribe(channel)
for {
switch v := psc.Receive().(type) {
case redis.Message:
log.Printf("message: %s: %s\n", v.Channel, string(v.Data))
case redis.Subscription:
log.Printf("subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count)
case error:
log.Println("error pub/sub, delivery has stopped")
return
default:
}
}
}

View File

@ -1,5 +1,12 @@
package overflow_gateway_websocket
import (
"git.loafle.net/overflow/overflow_gateway_websocket/backend"
"git.loafle.net/overflow/overflow_gateway_websocket/pubsub"
"git.loafle.net/overflow/overflow_gateway_websocket/pubsub/redis"
"google.golang.org/grpc"
)
type (
// OnPreRequestFunc is callback function
OnPreRequestFunc func()
@ -21,6 +28,9 @@ type server struct {
onPostRequestListeners []OnPostRequestFunc
onPreNotifyListeners []OnPreNotifyFunc
onPrePushListeners []OnPrePushFunc
backendPool backend.Pool
pubSub pubsub.PubSub
}
func NewServer() Server {
@ -30,6 +40,26 @@ func NewServer() Server {
onPreNotifyListeners: make([]OnPreNotifyFunc, 0),
onPrePushListeners: make([]OnPrePushFunc, 0),
}
var err error
s.backendPool, err = backend.NewPool(backend.Options{
Dial: func() (*grpc.ClientConn, error) {
return nil, nil
},
NewClient: func(*grpc.ClientConn) (backend.BackendClient, error) {
return nil, nil
},
})
if nil != err {
}
s.pubSub, err = redis.New("")
if nil != err {
}
return s
}