ing
This commit is contained in:
		
							parent
							
								
									d3f40ec6be
								
							
						
					
					
						commit
						1e1e212954
					
				| @ -20,3 +20,4 @@ import: | ||||
|   - metadata | ||||
| - package: git.loafle.net/overflow/overflow_gateway_websocket | ||||
| - package: git.loafle.net/commons_go/config | ||||
| - package: git.loafle.net/commons_go/redis_pool | ||||
|  | ||||
							
								
								
									
										27
									
								
								grpc/pool.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								grpc/pool.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,27 @@ | ||||
| package grpc | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 
 | ||||
| 	"git.loafle.net/commons_go/config" | ||||
| 	"git.loafle.net/commons_go/logging" | ||||
| 	ogp "git.loafle.net/overflow/overflow_grpc_pool" | ||||
| ) | ||||
| 
 | ||||
| func NewPool(ctx context.Context) ogp.Pool { | ||||
| 	h := &poolHandlers{ | ||||
| 		ctx:    ctx, | ||||
| 		logger: logging.WithContext(ctx), | ||||
| 	} | ||||
| 	h.cfg = config.Sub("grpc") | ||||
| 	h.MaxIdle = h.cfg.GetInt("pool.MaxIdle") | ||||
| 	h.MaxCapacity = h.cfg.GetInt("pool.MaxCapacity") | ||||
| 
 | ||||
| 	p, err := ogp.New(ctx, h) | ||||
| 	if nil != err { | ||||
| 		h.logger.Fatal(fmt.Sprintf("GRpc Pool: %v", err)) | ||||
| 	} | ||||
| 
 | ||||
| 	return p | ||||
| } | ||||
							
								
								
									
										30
									
								
								grpc/pool_handlers.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										30
									
								
								grpc/pool_handlers.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,30 @@ | ||||
| package grpc | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 
 | ||||
| 	"google.golang.org/grpc" | ||||
| 
 | ||||
| 	"git.loafle.net/commons_go/config" | ||||
| 	oas "git.loafle.net/overflow/overflow_api_server/golang" | ||||
| 	ogp "git.loafle.net/overflow/overflow_grpc_pool" | ||||
| 	"go.uber.org/zap" | ||||
| ) | ||||
| 
 | ||||
| type poolHandlers struct { | ||||
| 	ogp.PoolHandlers | ||||
| 	ctx    context.Context | ||||
| 	logger *zap.Logger | ||||
| 	cfg    config.Configurator | ||||
| } | ||||
| 
 | ||||
| func (h *poolHandlers) OnCreate() (*grpc.ClientConn, interface{}, error) { | ||||
| 	var err error | ||||
| 	conn, err := grpc.Dial(config.GetString("grpc.addr"), grpc.WithInsecure()) | ||||
| 	if nil != err { | ||||
| 		return nil, nil, err | ||||
| 	} | ||||
| 	c := oas.NewOverflowApiServerClient(conn) | ||||
| 	return conn, c, nil | ||||
| 
 | ||||
| } | ||||
| @ -1,67 +0,0 @@ | ||||
| package file | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"log" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"go.uber.org/zap" | ||||
| 
 | ||||
| 	"git.loafle.net/commons_go/config" | ||||
| 	"git.loafle.net/commons_go/logging" | ||||
| 	"git.loafle.net/overflow/overflow_gateway_web/handler" | ||||
| 	gws "git.loafle.net/overflow/overflow_gateway_websocket" | ||||
| 	"git.loafle.net/overflow/overflow_gateway_websocket/protocol/jsonrpc" | ||||
| ) | ||||
| 
 | ||||
| type FileHandler interface { | ||||
| 	handler.Handler | ||||
| } | ||||
| 
 | ||||
| type fileHandler struct { | ||||
| 	ctx     context.Context | ||||
| 	logger  *zap.Logger | ||||
| 	co      *gws.SocketOptions | ||||
| 	ho      *jsonrpc.Options | ||||
| 	handler gws.MessageHandler | ||||
| } | ||||
| 
 | ||||
| func New(ctx context.Context) FileHandler { | ||||
| 	h := &fileHandler{ | ||||
| 		ctx:    ctx, | ||||
| 		logger: logging.WithContext(ctx), | ||||
| 	} | ||||
| 
 | ||||
| 	h.ho = &jsonrpc.Options{ | ||||
| 		OnRequest: h.onRequest, | ||||
| 		OnNotify:  h.onNotify, | ||||
| 	} | ||||
| 
 | ||||
| 	h.handler = jsonrpc.NewHandler(ctx, h.ho) | ||||
| 
 | ||||
| 	h.co = &gws.SocketOptions{ | ||||
| 		Handler:        h.handler, | ||||
| 		MaxMessageSize: config.GetInt64("handlers.file.socket.MaxMessageSize"), | ||||
| 		WriteTimeout:   time.Duration(config.GetInt("handlers.file.socket.WriteTimeout")) * time.Second, | ||||
| 		ReadTimeout:    time.Duration(config.GetInt("handlers.file.socket.ReadTimeout")) * time.Second, | ||||
| 		PongTimeout:    time.Duration(config.GetInt("handlers.file.socket.PongTimeout")) * time.Second, | ||||
| 		PingTimeout:    time.Duration(config.GetInt("handlers.file.socket.PingTimeout")) * time.Second, | ||||
| 		PingPeriod:     time.Duration(config.GetInt("handlers.file.socket.PingPeriod")) * time.Second, | ||||
| 		BinaryMessage:  config.GetBool("handlers.file.socket.BinaryMessage"), | ||||
| 	} | ||||
| 
 | ||||
| 	return h | ||||
| } | ||||
| 
 | ||||
| func (h *fileHandler) GetSocketOption() *gws.SocketOptions { | ||||
| 	return h.co | ||||
| } | ||||
| 
 | ||||
| func (h *fileHandler) onRequest(soc gws.Socket, method string, params []string) (interface{}, error) { | ||||
| 	log.Printf("path: %s, m: %s, params: %v", soc.Path(), method, params) | ||||
| 	return nil, nil | ||||
| } | ||||
| 
 | ||||
| func (h *fileHandler) onNotify(soc gws.Socket, method string, params []string) error { | ||||
| 	return nil | ||||
| } | ||||
| @ -1,9 +0,0 @@ | ||||
| package handler | ||||
| 
 | ||||
| import ( | ||||
| 	gws "git.loafle.net/overflow/overflow_gateway_websocket" | ||||
| ) | ||||
| 
 | ||||
| type Handler interface { | ||||
| 	GetSocketOption() *gws.SocketOptions | ||||
| } | ||||
| @ -1,106 +0,0 @@ | ||||
| package web | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"strings" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"go.uber.org/zap" | ||||
| 
 | ||||
| 	"google.golang.org/grpc/metadata" | ||||
| 
 | ||||
| 	"git.loafle.net/commons_go/config" | ||||
| 	"git.loafle.net/commons_go/logging" | ||||
| 	backGRpc "git.loafle.net/overflow/overflow_api_server/golang" | ||||
| 	"git.loafle.net/overflow/overflow_gateway_web/handler" | ||||
| 	gws "git.loafle.net/overflow/overflow_gateway_websocket" | ||||
| 	"git.loafle.net/overflow/overflow_gateway_websocket/protocol/jsonrpc" | ||||
| 	grpcPool "git.loafle.net/overflow/overflow_grpc_pool" | ||||
| ) | ||||
| 
 | ||||
| type WebHandler interface { | ||||
| 	handler.Handler | ||||
| } | ||||
| 
 | ||||
| type webHandler struct { | ||||
| 	ctx     context.Context | ||||
| 	logger  *zap.Logger | ||||
| 	so      *gws.SocketOptions | ||||
| 	ro      *jsonrpc.Options | ||||
| 	handler gws.MessageHandler | ||||
| 	pool    grpcPool.Pool | ||||
| } | ||||
| 
 | ||||
| func New(ctx context.Context, pool grpcPool.Pool) WebHandler { | ||||
| 	h := &webHandler{ | ||||
| 		ctx:    ctx, | ||||
| 		logger: logging.WithContext(ctx), | ||||
| 		pool:   pool, | ||||
| 	} | ||||
| 
 | ||||
| 	h.ro = &jsonrpc.Options{ | ||||
| 		OnRequest: h.onRequest, | ||||
| 		OnNotify:  h.onNotify, | ||||
| 	} | ||||
| 
 | ||||
| 	h.handler = jsonrpc.NewHandler(ctx, h.ro) | ||||
| 
 | ||||
| 	h.so = &gws.SocketOptions{ | ||||
| 		Handler:        h.handler, | ||||
| 		MaxMessageSize: config.GetInt64("handlers.web.socket.MaxMessageSize"), | ||||
| 		WriteTimeout:   time.Duration(config.GetInt("handlers.web.socket.WriteTimeout")) * time.Second, | ||||
| 		ReadTimeout:    time.Duration(config.GetInt("handlers.web.socket.ReadTimeout")) * time.Second, | ||||
| 		PongTimeout:    time.Duration(config.GetInt("handlers.web.socket.PongTimeout")) * time.Second, | ||||
| 		PingTimeout:    time.Duration(config.GetInt("handlers.web.socket.PingTimeout")) * time.Second, | ||||
| 		PingPeriod:     time.Duration(config.GetInt("handlers.web.socket.PingPeriod")) * time.Second, | ||||
| 		BinaryMessage:  config.GetBool("handlers.web.socket.BinaryMessage"), | ||||
| 	} | ||||
| 
 | ||||
| 	return h | ||||
| } | ||||
| 
 | ||||
| func (h *webHandler) GetSocketOption() *gws.SocketOptions { | ||||
| 	return h.so | ||||
| } | ||||
| 
 | ||||
| func (h *webHandler) onRequest(soc gws.Socket, method string, params []string) (interface{}, error) { | ||||
| 	h.logger.Info("OnRequest", | ||||
| 		zap.String("path", soc.Path()), | ||||
| 		zap.String("method", method), | ||||
| 		zap.Any("params", params), | ||||
| 	) | ||||
| 
 | ||||
| 	c, err := h.pool.Get() | ||||
| 	if err != nil { | ||||
| 		h.logger.Error("cannot retrive GRPC Client") | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	defer h.pool.Put(c) | ||||
| 
 | ||||
| 	sm := strings.Split(method, ".") | ||||
| 
 | ||||
| 	si := &backGRpc.ServerInput{ | ||||
| 		Target: sm[0], | ||||
| 		Method: sm[1], | ||||
| 		Params: params, | ||||
| 	} | ||||
| 
 | ||||
| 	md := metadata.Pairs("email", "overflow@loafle.com") | ||||
| 	ctx := metadata.NewOutgoingContext(context.Background(), md) | ||||
| 
 | ||||
| 	so, err := c.(backGRpc.OverflowApiServerClient).Exec(ctx, si) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return so.Result, nil | ||||
| } | ||||
| 
 | ||||
| func (h *webHandler) onNotify(soc gws.Socket, method string, params []string) error { | ||||
| 	h.logger.Info("OnRequest", | ||||
| 		zap.String("path", soc.Path()), | ||||
| 		zap.String("method", method), | ||||
| 		zap.Any("params", params), | ||||
| 	) | ||||
| 	return nil | ||||
| } | ||||
							
								
								
									
										72
									
								
								handler/web_json_handlers.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										72
									
								
								handler/web_json_handlers.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,72 @@ | ||||
| package handler | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"strings" | ||||
| 
 | ||||
| 	"google.golang.org/grpc/metadata" | ||||
| 
 | ||||
| 	"git.loafle.net/commons_go/logging" | ||||
| 	oas "git.loafle.net/overflow/overflow_api_server/golang" | ||||
| 	ogw "git.loafle.net/overflow/overflow_gateway_websocket" | ||||
| 	"git.loafle.net/overflow/overflow_gateway_websocket/protocol/jsonrpc" | ||||
| 	ogp "git.loafle.net/overflow/overflow_grpc_pool" | ||||
| 	"go.uber.org/zap" | ||||
| ) | ||||
| 
 | ||||
| func newWebJSONRpcHandler(ctx context.Context, gPool ogp.Pool) ogw.ProtocolHandler { | ||||
| 	h := &webJSONRpcHandlers{ | ||||
| 		ctx:    ctx, | ||||
| 		logger: logging.WithContext(ctx), | ||||
| 		gPool:  gPool, | ||||
| 	} | ||||
| 
 | ||||
| 	p := jsonrpc.New(ctx, h) | ||||
| 
 | ||||
| 	return p | ||||
| } | ||||
| 
 | ||||
| type webJSONRpcHandlers struct { | ||||
| 	jsonrpc.JSONRpcHandlers | ||||
| 	ctx    context.Context | ||||
| 	logger *zap.Logger | ||||
| 	gPool  ogp.Pool | ||||
| } | ||||
| 
 | ||||
| func (h *webJSONRpcHandlers) OnRequest(soc ogw.Socket, method string, params []string) (interface{}, error) { | ||||
| 	h.logger.Info("OnRequest", | ||||
| 		zap.String("path", soc.Path()), | ||||
| 		zap.String("method", method), | ||||
| 		zap.Any("params", params), | ||||
| 	) | ||||
| 
 | ||||
| 	c, err := h.gPool.Get() | ||||
| 	if err != nil { | ||||
| 		h.logger.Error("cannot retrive GRPC Client") | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	defer h.gPool.Put(c) | ||||
| 
 | ||||
| 	sm := strings.Split(method, ".") | ||||
| 
 | ||||
| 	si := &oas.ServerInput{ | ||||
| 		Target: sm[0], | ||||
| 		Method: sm[1], | ||||
| 		Params: params, | ||||
| 	} | ||||
| 
 | ||||
| 	md := metadata.Pairs("email", "overflow@loafle.com") | ||||
| 	ctx := metadata.NewOutgoingContext(context.Background(), md) | ||||
| 
 | ||||
| 	so, err := c.(oas.OverflowApiServerClient).Exec(ctx, si) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	return so.Result, nil | ||||
| } | ||||
| 
 | ||||
| func (wjh *webJSONRpcHandlers) OnNotify(soc ogw.Socket, method string, params []string) error { | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
							
								
								
									
										40
									
								
								handler/web_socket_handlers.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										40
									
								
								handler/web_socket_handlers.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,40 @@ | ||||
| package handler | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"git.loafle.net/commons_go/config" | ||||
| 	"git.loafle.net/commons_go/logging" | ||||
| 	ogw "git.loafle.net/overflow/overflow_gateway_websocket" | ||||
| 	ogp "git.loafle.net/overflow/overflow_grpc_pool" | ||||
| 	"go.uber.org/zap" | ||||
| ) | ||||
| 
 | ||||
| type webSocketHandlers struct { | ||||
| 	ogw.SocketHandlers | ||||
| 	ctx    context.Context | ||||
| 	logger *zap.Logger | ||||
| 	cfg    config.Configurator | ||||
| } | ||||
| 
 | ||||
| func NewWebSocketHandler(ctx context.Context, gPool ogp.Pool) ogw.SocketHandler { | ||||
| 	h := &webSocketHandlers{ | ||||
| 		ctx:    ctx, | ||||
| 		logger: logging.WithContext(ctx), | ||||
| 	} | ||||
| 	h.cfg = config.Sub("handlers.web") | ||||
| 	h.MaxMessageSize = h.cfg.GetInt64("socket.MaxMessageSize") | ||||
| 	h.WriteTimeout = h.cfg.GetDuration("socket.WriteTimeout") * time.Second | ||||
| 	h.ReadTimeout = h.cfg.GetDuration("socket.ReadTimeout") * time.Second | ||||
| 	h.PongTimeout = h.cfg.GetDuration("socket.PongTimeout") * time.Second | ||||
| 	h.PingTimeout = h.cfg.GetDuration("socket.PingTimeout") * time.Second | ||||
| 	h.PingPeriod = h.cfg.GetDuration("socket.PingPeriod") * time.Second | ||||
| 	h.BinaryMessage = h.cfg.GetBool("socket.BinaryMessage") | ||||
| 
 | ||||
| 	ph := newWebJSONRpcHandler(ctx, gPool) | ||||
| 
 | ||||
| 	h.Protocol = ph | ||||
| 
 | ||||
| 	return h | ||||
| } | ||||
							
								
								
									
										133
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										133
									
								
								main.go
									
									
									
									
									
								
							| @ -4,101 +4,49 @@ import ( | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"log" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"go.uber.org/zap" | ||||
| 
 | ||||
| 	"google.golang.org/grpc" | ||||
| 
 | ||||
| 	"github.com/garyburd/redigo/redis" | ||||
| 	"github.com/valyala/fasthttp" | ||||
| 
 | ||||
| 	"git.loafle.net/commons_go/config" | ||||
| 	"git.loafle.net/commons_go/logging" | ||||
| 	backGRpc "git.loafle.net/overflow/overflow_api_server/golang" | ||||
| 	"git.loafle.net/overflow/overflow_gateway_web/handler/file" | ||||
| 	"git.loafle.net/overflow/overflow_gateway_web/handler/web" | ||||
| 	gws "git.loafle.net/overflow/overflow_gateway_websocket" | ||||
| 	grpcPool "git.loafle.net/overflow/overflow_grpc_pool" | ||||
| 	subscriberRedis "git.loafle.net/overflow/overflow_subscriber/redis" | ||||
| 	"git.loafle.net/overflow/overflow_gateway_web/grpc" | ||||
| 	"git.loafle.net/overflow/overflow_gateway_web/handler" | ||||
| 	"git.loafle.net/overflow/overflow_gateway_web/redis" | ||||
| 	"git.loafle.net/overflow/overflow_gateway_web/server" | ||||
| 	"git.loafle.net/overflow/overflow_gateway_web/subscribe" | ||||
| ) | ||||
| 
 | ||||
| var logger *zap.Logger | ||||
| 
 | ||||
| func main() { | ||||
| 	loadConfig() | ||||
| 	ctx := newContext() | ||||
| 
 | ||||
| 	defer logger.Sync() | ||||
| 
 | ||||
| 	s := server.NewServer(ctx) | ||||
| 	rp := redis.NewPool(ctx) | ||||
| 	gp := grpc.NewPool(ctx) | ||||
| 
 | ||||
| 	wsh := handler.NewWebSocketHandler(ctx, gp) | ||||
| 
 | ||||
| 	subscribe.Subscribe(ctx, rp.Get()) | ||||
| 
 | ||||
| 	s.HandleSocket(config.GetString("handlers.web.entry"), wsh) | ||||
| 
 | ||||
| 	s.ListenAndServe(config.GetString("server.addr")) | ||||
| } | ||||
| 
 | ||||
| func loadConfig() { | ||||
| 	config.SetConfigName("config") | ||||
| 	config.AddConfigPath(".") | ||||
| 	err := config.ReadInConfig() | ||||
| 	if nil != err { | ||||
| 		log.Fatalf("config error: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	ctx, cancel := NewContext() | ||||
| 	defer cancel() | ||||
| 
 | ||||
| 	logger = logging.WithContext(ctx) | ||||
| 	defer logger.Sync() | ||||
| 
 | ||||
| 	o := &gws.ServerOptions{ | ||||
| 		OnConnection:      onConnection, | ||||
| 		OnDisconnected:    onDisconnected, | ||||
| 		OnCheckOrigin:     onCheckOrigin, | ||||
| 		HandshakeTimeout:  time.Duration(config.GetInt("websocket.HandshakeTimeout")) * time.Second, | ||||
| 		ReadBufferSize:    config.GetInt("websocket.ReadBufferSize"), | ||||
| 		WriteBufferSize:   config.GetInt("websocket.WriteBufferSize"), | ||||
| 		EnableCompression: config.GetBool("websocket.EnableCompression"), | ||||
| 	} | ||||
| 	s := gws.NewServer(ctx, o) | ||||
| 
 | ||||
| 	rPool := &redis.Pool{ | ||||
| 		MaxIdle:     config.GetInt("redis.pool.MaxIdle"), | ||||
| 		IdleTimeout: time.Duration(config.GetInt("redis.pool.IdleTimeout")) * time.Second, | ||||
| 		Dial: func() (redis.Conn, error) { | ||||
| 			return redis.Dial(config.GetString("redis.network"), config.GetString("redis.addr")) | ||||
| 		}, | ||||
| 	} | ||||
| 	defer func() { | ||||
| 		rPool.Close() | ||||
| 	}() | ||||
| 
 | ||||
| 	noti := subscriberRedis.New(ctx, rPool.Get()) | ||||
| 
 | ||||
| 	noti.Subscribe("web", func(channel string, payload string) { | ||||
| 		logger.Info("Subscribe", | ||||
| 			zap.String("channel", channel), | ||||
| 			zap.String("payload", payload), | ||||
| 		) | ||||
| 	}) | ||||
| 
 | ||||
| 	bo := &grpcPool.Options{ | ||||
| 		MaxIdle:     config.GetInt("grpc.pool.MaxIdle"), | ||||
| 		MaxCapacity: config.GetInt("grpc.pool.MaxCapacity"), | ||||
| 
 | ||||
| 		Creators: func() (*grpc.ClientConn, interface{}, error) { | ||||
| 			var err error | ||||
| 			conn, err := grpc.Dial(config.GetString("grpc.addr"), grpc.WithInsecure()) | ||||
| 			if nil != err { | ||||
| 				return nil, nil, err | ||||
| 			} | ||||
| 			c := backGRpc.NewOverflowApiServerClient(conn) | ||||
| 			return conn, c, nil | ||||
| 		}, | ||||
| 	} | ||||
| 	bPool, err := grpcPool.New(ctx, bo) | ||||
| 	if nil != err { | ||||
| 		logger.Panic("Cannot create Pool of GRPC") | ||||
| 	} | ||||
| 
 | ||||
| 	wh := web.New(ctx, bPool) | ||||
| 	fh := file.New(ctx) | ||||
| 
 | ||||
| 	s.HandleSocket(config.GetString("handlers.web.entry"), wh.GetSocketOption()) | ||||
| 	s.HandleSocket(config.GetString("handlers.file.entry"), fh.GetSocketOption()) | ||||
| 
 | ||||
| 	s.ListenAndServe(config.GetString("server.addr")) | ||||
| } | ||||
| 
 | ||||
| func NewContext() (context.Context, context.CancelFunc) { | ||||
| func newContext() context.Context { | ||||
| 	var err error | ||||
| 	ctx := context.Background() | ||||
| 	logConfig := config.Sub("logging") | ||||
| @ -112,39 +60,12 @@ func NewContext() (context.Context, context.CancelFunc) { | ||||
| 		panic(err) | ||||
| 	} | ||||
| 
 | ||||
| 	logger, err := cfg.Build() | ||||
| 	logger, err = cfg.Build() | ||||
| 	if err != nil { | ||||
| 		panic(err) | ||||
| 	} | ||||
| 
 | ||||
| 	ctx = logging.NewContext(ctx, logger) | ||||
| 
 | ||||
| 	ctx, cancel := context.WithCancel(ctx) | ||||
| 
 | ||||
| 	return ctx, cancel | ||||
| } | ||||
| 
 | ||||
| func onCheckOrigin(ctx *fasthttp.RequestCtx) bool { | ||||
| 	if origin := string(ctx.Request.Header.Peek("Origin")); origin != "" { | ||||
| 		ctx.Response.Header.Set("Access-Control-Allow-Origin", origin) | ||||
| 		if string(ctx.Method()) == "OPTIONS" && string(ctx.Request.Header.Peek("Access-Control-Request-Method")) != "" { | ||||
| 			ctx.Response.Header.Set("Access-Control-Allow-Headers", "Content-Type, Accept") | ||||
| 			ctx.Response.Header.Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, DELETE") | ||||
| 		} | ||||
| 	} | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| func onConnection(soc gws.Socket) { | ||||
| 	logger.Info("connect", | ||||
| 		zap.String("path", soc.Path()), | ||||
| 		zap.String("id", soc.ID()), | ||||
| 	) | ||||
| } | ||||
| 
 | ||||
| func onDisconnected(soc gws.Socket) { | ||||
| 	logger.Info("disconnect", | ||||
| 		zap.String("path", soc.Path()), | ||||
| 		zap.String("id", soc.ID()), | ||||
| 	) | ||||
| 	return ctx | ||||
| } | ||||
|  | ||||
							
								
								
									
										24
									
								
								redis/pool.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								redis/pool.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,24 @@ | ||||
| package redis | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"git.loafle.net/commons_go/config" | ||||
| 	"git.loafle.net/commons_go/logging" | ||||
| 	rp "git.loafle.net/commons_go/redis_pool" | ||||
| ) | ||||
| 
 | ||||
| func NewPool(ctx context.Context) rp.Pool { | ||||
| 	h := &poolHandlers{ | ||||
| 		ctx:    ctx, | ||||
| 		logger: logging.WithContext(ctx), | ||||
| 	} | ||||
| 	h.cfg = config.Sub("redis") | ||||
| 	h.MaxIdle = h.cfg.GetInt("pool.MaxIdle") | ||||
| 	h.IdleTimeout = h.cfg.GetDuration("pool.IdleTimeout") * time.Second | ||||
| 
 | ||||
| 	p := rp.NewPool(ctx, h) | ||||
| 
 | ||||
| 	return p | ||||
| } | ||||
							
								
								
									
										27
									
								
								redis/pool_handlers.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								redis/pool_handlers.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,27 @@ | ||||
| package redis | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/garyburd/redigo/redis" | ||||
| 
 | ||||
| 	"git.loafle.net/commons_go/config" | ||||
| 	rp "git.loafle.net/commons_go/redis_pool" | ||||
| 	"go.uber.org/zap" | ||||
| ) | ||||
| 
 | ||||
| type poolHandlers struct { | ||||
| 	rp.PoolHandlers | ||||
| 	ctx    context.Context | ||||
| 	logger *zap.Logger | ||||
| 	cfg    config.Configurator | ||||
| } | ||||
| 
 | ||||
| func (h *poolHandlers) Dial() (redis.Conn, error) { | ||||
| 	return redis.Dial(h.cfg.GetString("network"), h.cfg.GetString("addr")) | ||||
| } | ||||
| 
 | ||||
| func (h *poolHandlers) TestOnBorrow(c redis.Conn, t time.Time) error { | ||||
| 	return nil | ||||
| } | ||||
							
								
								
									
										26
									
								
								server/server.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										26
									
								
								server/server.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,26 @@ | ||||
| package server | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"git.loafle.net/commons_go/config" | ||||
| 	"git.loafle.net/commons_go/logging" | ||||
| 	ogw "git.loafle.net/overflow/overflow_gateway_websocket" | ||||
| ) | ||||
| 
 | ||||
| func NewServer(ctx context.Context) ogw.Server { | ||||
| 	h := &serverHandlers{ | ||||
| 		ctx:    ctx, | ||||
| 		logger: logging.WithContext(ctx), | ||||
| 	} | ||||
| 	h.cfg = config.Sub("websocket") | ||||
| 	h.HandshakeTimeout = h.cfg.GetDuration("HandshakeTimeout") * time.Second | ||||
| 	h.ReadBufferSize = h.cfg.GetInt("ReadBufferSize") | ||||
| 	h.WriteBufferSize = h.cfg.GetInt("WriteBufferSize") | ||||
| 	h.EnableCompression = h.cfg.GetBool("EnableCompression") | ||||
| 
 | ||||
| 	s := ogw.NewServer(ctx, h) | ||||
| 
 | ||||
| 	return s | ||||
| } | ||||
							
								
								
									
										45
									
								
								server/server_handlers.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										45
									
								
								server/server_handlers.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,45 @@ | ||||
| package server | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 
 | ||||
| 	"git.loafle.net/commons_go/config" | ||||
| 	"github.com/valyala/fasthttp" | ||||
| 	"go.uber.org/zap" | ||||
| 
 | ||||
| 	ogw "git.loafle.net/overflow/overflow_gateway_websocket" | ||||
| ) | ||||
| 
 | ||||
| type serverHandlers struct { | ||||
| 	ogw.ServerHandlers | ||||
| 	ctx    context.Context | ||||
| 	logger *zap.Logger | ||||
| 	cfg    config.Configurator | ||||
| } | ||||
| 
 | ||||
| func (sh *serverHandlers) OnConnection(soc ogw.Socket) { | ||||
| 	sh.logger.Info("connect", | ||||
| 		zap.String("path", soc.Path()), | ||||
| 		zap.String("id", soc.ID()), | ||||
| 	) | ||||
| } | ||||
| 
 | ||||
| func (sh *serverHandlers) OnDisconnected(soc ogw.Socket) { | ||||
| 	sh.logger.Info("disconnect", | ||||
| 		zap.String("path", soc.Path()), | ||||
| 		zap.String("id", soc.ID()), | ||||
| 	) | ||||
| } | ||||
| func (sh *serverHandlers) OnCheckOrigin(ctx *fasthttp.RequestCtx) bool { | ||||
| 	if origin := string(ctx.Request.Header.Peek("Origin")); origin != "" { | ||||
| 		ctx.Response.Header.Set("Access-Control-Allow-Origin", origin) | ||||
| 		if string(ctx.Method()) == "OPTIONS" && string(ctx.Request.Header.Peek("Access-Control-Request-Method")) != "" { | ||||
| 			ctx.Response.Header.Set("Access-Control-Allow-Headers", "Content-Type, Accept") | ||||
| 			ctx.Response.Header.Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, DELETE") | ||||
| 		} | ||||
| 	} | ||||
| 	return true | ||||
| } | ||||
| func (sh *serverHandlers) OnError(ctx *fasthttp.RequestCtx, status int, reason error) { | ||||
| 
 | ||||
| } | ||||
							
								
								
									
										15
									
								
								subscribe/redis.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										15
									
								
								subscribe/redis.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,15 @@ | ||||
| package subscribe | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 
 | ||||
| 	ofs_redis "git.loafle.net/overflow/overflow_subscriber/redis" | ||||
| 	"github.com/garyburd/redigo/redis" | ||||
| ) | ||||
| 
 | ||||
| func Subscribe(ctx context.Context, redisConn redis.Conn) { | ||||
| 	s := ofs_redis.New(ctx, redisConn) | ||||
| 	webS := newWebSubscriberHandler(ctx, "web") | ||||
| 
 | ||||
| 	s.Subscribe(webS) | ||||
| } | ||||
							
								
								
									
										31
									
								
								subscribe/web_subscriber_handlers.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										31
									
								
								subscribe/web_subscriber_handlers.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,31 @@ | ||||
| package subscribe | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 
 | ||||
| 	"git.loafle.net/commons_go/logging" | ||||
| 	ofs "git.loafle.net/overflow/overflow_subscriber" | ||||
| 	"go.uber.org/zap" | ||||
| ) | ||||
| 
 | ||||
| func newWebSubscriberHandler(ctx context.Context, channel string) ofs.SubscriberHandler { | ||||
| 	h := &webSubscriberHandlers{ | ||||
| 		ctx:    ctx, | ||||
| 		logger: logging.WithContext(ctx), | ||||
| 	} | ||||
| 	h.Channel = channel | ||||
| 
 | ||||
| 	return h | ||||
| } | ||||
| 
 | ||||
| type webSubscriberHandlers struct { | ||||
| 	ofs.SubscriberHandlers | ||||
| 	ctx    context.Context | ||||
| 	logger *zap.Logger | ||||
| } | ||||
| 
 | ||||
| func (h *webSubscriberHandlers) OnSubscribe(payload string) { | ||||
| 	h.logger.Info("Subscriber:Web", | ||||
| 		zap.String("payload", payload), | ||||
| 	) | ||||
| } | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user