ing
This commit is contained in:
		
							parent
							
								
									192344a034
								
							
						
					
					
						commit
						5daa3f6f4e
					
				
							
								
								
									
										8
									
								
								.vscode/settings.json
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										8
									
								
								.vscode/settings.json
									
									
									
									
										vendored
									
									
								
							@ -1,11 +1,3 @@
 | 
			
		||||
// Place your settings in this file to overwrite default and user settings.
 | 
			
		||||
{
 | 
			
		||||
  // Specifies Lint tool name.
 | 
			
		||||
  "go.lintTool": "gometalinter",
 | 
			
		||||
 | 
			
		||||
  // Flags to pass to Lint tool (e.g. ["-min_confidence=.8"])
 | 
			
		||||
  "go.lintFlags": [
 | 
			
		||||
    "--config=${workspaceRoot}/golint.json"
 | 
			
		||||
  ]
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										15
									
								
								glide.yaml
									
									
									
									
									
								
							
							
						
						
									
										15
									
								
								glide.yaml
									
									
									
									
									
								
							@ -1,16 +1,21 @@
 | 
			
		||||
package: git.loafle.net/overflow/overflow_gateway_web
 | 
			
		||||
import:
 | 
			
		||||
- package: git.loafle.net/overflow/overflow_gateway_websocket
 | 
			
		||||
- package: google.golang.org/grpc
 | 
			
		||||
  version: v1.5.2
 | 
			
		||||
- package: git.loafle.net/commons_go/logging
 | 
			
		||||
- package: git.loafle.net/overflow/overflow_api_server
 | 
			
		||||
  subpackages:
 | 
			
		||||
  - golang
 | 
			
		||||
- package: git.loafle.net/overflow/overflow_grpc_pool
 | 
			
		||||
- package: git.loafle.net/overflow/overflow_subscriber
 | 
			
		||||
  subpackages:
 | 
			
		||||
  - redis
 | 
			
		||||
- package: github.com/garyburd/redigo
 | 
			
		||||
  version: v1.1.0
 | 
			
		||||
  subpackages:
 | 
			
		||||
  - redis
 | 
			
		||||
- package: git.loafle.net/commons_go/util
 | 
			
		||||
- package: github.com/valyala/fasthttp
 | 
			
		||||
  version: v20160617
 | 
			
		||||
- package: google.golang.org/grpc
 | 
			
		||||
  version: v1.5.2
 | 
			
		||||
  subpackages:
 | 
			
		||||
  - channel
 | 
			
		||||
  - metadata
 | 
			
		||||
- package: git.loafle.net/overflow/overflow_gateway_websocket
 | 
			
		||||
 | 
			
		||||
@ -1,8 +1,12 @@
 | 
			
		||||
package file
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"log"
 | 
			
		||||
 | 
			
		||||
	"go.uber.org/zap"
 | 
			
		||||
 | 
			
		||||
	"git.loafle.net/commons_go/logging"
 | 
			
		||||
	"git.loafle.net/overflow/overflow_gateway_web/handler"
 | 
			
		||||
	gws "git.loafle.net/overflow/overflow_gateway_websocket"
 | 
			
		||||
	"git.loafle.net/overflow/overflow_gateway_websocket/protocol/jsonrpc"
 | 
			
		||||
@ -13,20 +17,25 @@ type FileHandler interface {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type fileHandler struct {
 | 
			
		||||
	ctx     context.Context
 | 
			
		||||
	logger  *zap.Logger
 | 
			
		||||
	co      *gws.SocketOptions
 | 
			
		||||
	ho      *jsonrpc.Options
 | 
			
		||||
	handler gws.MessageHandler
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func New() FileHandler {
 | 
			
		||||
	h := &fileHandler{}
 | 
			
		||||
func New(ctx context.Context) FileHandler {
 | 
			
		||||
	h := &fileHandler{
 | 
			
		||||
		ctx:    ctx,
 | 
			
		||||
		logger: logging.WithContext(ctx),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	h.ho = &jsonrpc.Options{
 | 
			
		||||
		OnRequest: h.onRequest,
 | 
			
		||||
		OnNotify:  h.onNotify,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	h.handler = jsonrpc.NewHandler(h.ho)
 | 
			
		||||
	h.handler = jsonrpc.NewHandler(ctx, h.ho)
 | 
			
		||||
 | 
			
		||||
	h.co = &gws.SocketOptions{
 | 
			
		||||
		Handler: h.handler,
 | 
			
		||||
 | 
			
		||||
@ -2,11 +2,13 @@ package web
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"log"
 | 
			
		||||
	"strings"
 | 
			
		||||
 | 
			
		||||
	"go.uber.org/zap"
 | 
			
		||||
 | 
			
		||||
	"google.golang.org/grpc/metadata"
 | 
			
		||||
 | 
			
		||||
	"git.loafle.net/commons_go/logging"
 | 
			
		||||
	backGRpc "git.loafle.net/overflow/overflow_api_server/golang"
 | 
			
		||||
	"git.loafle.net/overflow/overflow_gateway_web/handler"
 | 
			
		||||
	gws "git.loafle.net/overflow/overflow_gateway_websocket"
 | 
			
		||||
@ -19,15 +21,19 @@ type WebHandler interface {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type webHandler struct {
 | 
			
		||||
	ctx     context.Context
 | 
			
		||||
	logger  *zap.Logger
 | 
			
		||||
	so      *gws.SocketOptions
 | 
			
		||||
	ro      *jsonrpc.Options
 | 
			
		||||
	handler gws.MessageHandler
 | 
			
		||||
	pool    grpcPool.Pool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func New(pool grpcPool.Pool) WebHandler {
 | 
			
		||||
func New(ctx context.Context, pool grpcPool.Pool) WebHandler {
 | 
			
		||||
	h := &webHandler{
 | 
			
		||||
		pool: pool,
 | 
			
		||||
		ctx:    ctx,
 | 
			
		||||
		logger: logging.WithContext(ctx),
 | 
			
		||||
		pool:   pool,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	h.ro = &jsonrpc.Options{
 | 
			
		||||
@ -35,7 +41,7 @@ func New(pool grpcPool.Pool) WebHandler {
 | 
			
		||||
		OnNotify:  h.onNotify,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	h.handler = jsonrpc.NewHandler(h.ro)
 | 
			
		||||
	h.handler = jsonrpc.NewHandler(ctx, h.ro)
 | 
			
		||||
 | 
			
		||||
	h.so = &gws.SocketOptions{
 | 
			
		||||
		Handler: h.handler,
 | 
			
		||||
@ -49,11 +55,15 @@ func (h *webHandler) GetSocketOption() *gws.SocketOptions {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *webHandler) onRequest(soc gws.Socket, method string, params []string) (interface{}, error) {
 | 
			
		||||
	log.Printf("path: %s, m: %s, params: %v", soc.Path(), method, params)
 | 
			
		||||
	h.logger.Info("OnRequest",
 | 
			
		||||
		zap.String("path", soc.Path()),
 | 
			
		||||
		zap.String("method", method),
 | 
			
		||||
		zap.Any("params", params),
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	c, err := h.pool.Get()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Println("cannot retrive GRPC Client")
 | 
			
		||||
		h.logger.Error("cannot retrive GRPC Client")
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	defer h.pool.Put(c)
 | 
			
		||||
@ -78,6 +88,10 @@ func (h *webHandler) onRequest(soc gws.Socket, method string, params []string) (
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *webHandler) onNotify(soc gws.Socket, method string, params []string) error {
 | 
			
		||||
	log.Printf("path: %s, m: %s, params: %v", soc.Path(), method, params)
 | 
			
		||||
	h.logger.Info("OnRequest",
 | 
			
		||||
		zap.String("path", soc.Path()),
 | 
			
		||||
		zap.String("method", method),
 | 
			
		||||
		zap.Any("params", params),
 | 
			
		||||
	)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										41
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										41
									
								
								main.go
									
									
									
									
									
								
							@ -2,26 +2,32 @@ package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"log"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"go.uber.org/zap"
 | 
			
		||||
 | 
			
		||||
	"google.golang.org/grpc"
 | 
			
		||||
 | 
			
		||||
	"github.com/garyburd/redigo/redis"
 | 
			
		||||
	"github.com/valyala/fasthttp"
 | 
			
		||||
 | 
			
		||||
	"git.loafle.net/commons_go/logging"
 | 
			
		||||
	backGRpc "git.loafle.net/overflow/overflow_api_server/golang"
 | 
			
		||||
	"git.loafle.net/overflow/overflow_gateway_web/handler/file"
 | 
			
		||||
	"git.loafle.net/overflow/overflow_gateway_web/handler/web"
 | 
			
		||||
	notiRedis "git.loafle.net/overflow/overflow_gateway_web/notification/redis"
 | 
			
		||||
	gws "git.loafle.net/overflow/overflow_gateway_websocket"
 | 
			
		||||
	grpcPool "git.loafle.net/overflow/overflow_grpc_pool"
 | 
			
		||||
	subscriberRedis "git.loafle.net/overflow/overflow_subscriber/redis"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var logger *zap.Logger
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
	ctx, cancel := NewContext()
 | 
			
		||||
	defer cancel()
 | 
			
		||||
 | 
			
		||||
	logger = logging.WithContext(ctx)
 | 
			
		||||
 | 
			
		||||
	o := &gws.ServerOptions{
 | 
			
		||||
		OnConnection:   onConnection,
 | 
			
		||||
		OnDisconnected: onDisconnected,
 | 
			
		||||
@ -36,12 +42,17 @@ func main() {
 | 
			
		||||
			return redis.Dial("tcp", "127.0.0.1:6379")
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	defer rPool.Close()
 | 
			
		||||
	defer func() {
 | 
			
		||||
		rPool.Close()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	noti := notiRedis.New(ctx, rPool.Get())
 | 
			
		||||
	noti := subscriberRedis.New(ctx, rPool.Get())
 | 
			
		||||
 | 
			
		||||
	noti.Subscribe("web", func(channel string, payload string) {
 | 
			
		||||
		log.Printf("c:%s, p:%s", channel, payload)
 | 
			
		||||
		logger.Info("Subscribe",
 | 
			
		||||
			zap.String("channel", channel),
 | 
			
		||||
			zap.String("payload", payload),
 | 
			
		||||
		)
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	bo := &grpcPool.Options{
 | 
			
		||||
@ -58,13 +69,13 @@ func main() {
 | 
			
		||||
			return conn, c, nil
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	bPool, err := grpcPool.New(bo)
 | 
			
		||||
	bPool, err := grpcPool.New(ctx, bo)
 | 
			
		||||
	if nil != err {
 | 
			
		||||
		log.Panic("Cannot create Pool of GRPC")
 | 
			
		||||
		logger.Panic("Cannot create Pool of GRPC")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	wh := web.New(bPool)
 | 
			
		||||
	fh := file.New()
 | 
			
		||||
	wh := web.New(ctx, bPool)
 | 
			
		||||
	fh := file.New(ctx)
 | 
			
		||||
 | 
			
		||||
	s.HandleSocket("/web", wh.GetSocketOption())
 | 
			
		||||
	s.HandleSocket("/file", fh.GetSocketOption())
 | 
			
		||||
@ -74,6 +85,8 @@ func main() {
 | 
			
		||||
 | 
			
		||||
func NewContext() (context.Context, context.CancelFunc) {
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
	ctx = logging.NewContext(ctx, nil)
 | 
			
		||||
 | 
			
		||||
	ctx, cancel := context.WithCancel(ctx)
 | 
			
		||||
	ctx = context.WithValue(ctx, "key1", "val1")
 | 
			
		||||
 | 
			
		||||
@ -92,9 +105,15 @@ func onCheckOrigin(ctx *fasthttp.RequestCtx) bool {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func onConnection(soc gws.Socket) {
 | 
			
		||||
	log.Printf("connect: path: %s, id:%s\n", soc.Path(), soc.ID())
 | 
			
		||||
	logger.Info("connect",
 | 
			
		||||
		zap.String("path", soc.Path()),
 | 
			
		||||
		zap.String("id", soc.ID()),
 | 
			
		||||
	)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func onDisconnected(soc gws.Socket) {
 | 
			
		||||
	log.Printf("disconnect: path: %s, id:%s\n", soc.Path(), soc.ID())
 | 
			
		||||
	logger.Info("connect",
 | 
			
		||||
		zap.String("path", soc.Path()),
 | 
			
		||||
		zap.String("id", soc.ID()),
 | 
			
		||||
	)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,10 +0,0 @@
 | 
			
		||||
package notification
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	OnSubscribeFunc func(channel string, payload string)
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Notifier interface {
 | 
			
		||||
	Subscribe(channel string, cb OnSubscribeFunc)
 | 
			
		||||
	Unsubscribe(channel string, cb OnSubscribeFunc)
 | 
			
		||||
}
 | 
			
		||||
@ -1,120 +0,0 @@
 | 
			
		||||
package redis
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"log"
 | 
			
		||||
 | 
			
		||||
	channelUtil "git.loafle.net/commons_go/util/channel"
 | 
			
		||||
	"git.loafle.net/overflow/overflow_gateway_web/notification"
 | 
			
		||||
	"github.com/garyburd/redigo/redis"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type subscribeChannelAction struct {
 | 
			
		||||
	channelUtil.Action
 | 
			
		||||
	channel string
 | 
			
		||||
	cb      notification.OnSubscribeFunc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Notifier interface {
 | 
			
		||||
	notification.Notifier
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type notifier struct {
 | 
			
		||||
	ctx                   context.Context
 | 
			
		||||
	conn                  redis.PubSubConn
 | 
			
		||||
	subListeners          map[string]notification.OnSubscribeFunc
 | 
			
		||||
	isListenSubscriptions bool
 | 
			
		||||
	subCh                 chan subscribeChannelAction
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func New(ctx context.Context, conn redis.Conn) Notifier {
 | 
			
		||||
	n := ¬ifier{
 | 
			
		||||
		ctx:                   ctx,
 | 
			
		||||
		subListeners:          make(map[string]notification.OnSubscribeFunc),
 | 
			
		||||
		isListenSubscriptions: false,
 | 
			
		||||
		subCh: make(chan subscribeChannelAction),
 | 
			
		||||
	}
 | 
			
		||||
	n.conn = redis.PubSubConn{Conn: conn}
 | 
			
		||||
 | 
			
		||||
	go n.listen()
 | 
			
		||||
 | 
			
		||||
	return n
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (n *notifier) listen() {
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case sa := <-n.subCh:
 | 
			
		||||
			switch sa.Type {
 | 
			
		||||
			case channelUtil.ActionTypeCreate:
 | 
			
		||||
				_, ok := n.subListeners[sa.channel]
 | 
			
		||||
				if ok {
 | 
			
		||||
					log.Fatalf("notification: Subscription of channel[%s] is already exist", sa.channel)
 | 
			
		||||
				} else {
 | 
			
		||||
					n.subListeners[sa.channel] = sa.cb
 | 
			
		||||
					n.conn.Subscribe(sa.channel)
 | 
			
		||||
					n.listenSubscriptions()
 | 
			
		||||
				}
 | 
			
		||||
				break
 | 
			
		||||
			case channelUtil.ActionTypeDelete:
 | 
			
		||||
				_, ok := n.subListeners[sa.channel]
 | 
			
		||||
				if ok {
 | 
			
		||||
					n.conn.Unsubscribe(sa.channel)
 | 
			
		||||
					delete(n.subListeners, sa.channel)
 | 
			
		||||
				} else {
 | 
			
		||||
					log.Fatalf("notification: Subscription of channel[%s] is not exist", sa.channel)
 | 
			
		||||
				}
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
		case <-n.ctx.Done():
 | 
			
		||||
			log.Println("redis noti: Context Done")
 | 
			
		||||
			n.conn.Close()
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (n *notifier) listenSubscriptions() {
 | 
			
		||||
	if n.isListenSubscriptions {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			switch v := n.conn.Receive().(type) {
 | 
			
		||||
			case redis.Message:
 | 
			
		||||
				if cb, ok := n.subListeners[v.Channel]; ok {
 | 
			
		||||
					cb(v.Channel, string(v.Data))
 | 
			
		||||
				}
 | 
			
		||||
			case redis.Subscription:
 | 
			
		||||
				log.Printf("subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count)
 | 
			
		||||
			case error:
 | 
			
		||||
				log.Println("error pub/sub, delivery has stopped")
 | 
			
		||||
				return
 | 
			
		||||
			default:
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	n.isListenSubscriptions = true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (n *notifier) Subscribe(channel string, cb notification.OnSubscribeFunc) {
 | 
			
		||||
	ca := subscribeChannelAction{
 | 
			
		||||
		channel: channel,
 | 
			
		||||
		cb:      cb,
 | 
			
		||||
	}
 | 
			
		||||
	ca.Type = channelUtil.ActionTypeCreate
 | 
			
		||||
 | 
			
		||||
	n.subCh <- ca
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (n *notifier) Unsubscribe(channel string, cb notification.OnSubscribeFunc) {
 | 
			
		||||
	ca := subscribeChannelAction{
 | 
			
		||||
		channel: channel,
 | 
			
		||||
		cb:      cb,
 | 
			
		||||
	}
 | 
			
		||||
	ca.Type = channelUtil.ActionTypeDelete
 | 
			
		||||
 | 
			
		||||
	n.subCh <- ca
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user