This commit is contained in:
crusader 2017-11-14 01:50:22 +09:00
parent 766cf34e71
commit 4542ab52e6
15 changed files with 296 additions and 158 deletions

View File

@ -3,10 +3,10 @@ package config
var Config GatewayConfig var Config GatewayConfig
type GatewayConfig struct { type GatewayConfig struct {
Server Server `json:"server" yaml:"server" toml:"server"` Server *Server `json:"server" yaml:"server" toml:"server"`
Auth Auth `json:"auth" yaml:"auth" toml:"auth"` Auth *Auth `json:"auth" yaml:"auth" toml:"auth"`
Websocket Websocket `json:"websocket" yaml:"websocket" toml:"websocket"` Websocket *Websocket `json:"websocket" yaml:"websocket" toml:"websocket"`
GRPC GRPC `json:"gRPC" yaml:"gRPC" toml:"gRPC"` GRPC *GRPC `json:"gRPC" yaml:"gRPC" toml:"gRPC"`
Redis Redis `json:"redis" yaml:"redis" toml:"redis"` Redis *Redis `json:"redis" yaml:"redis" toml:"redis"`
Handlers map[string]Handler `json:"handlers" yaml:"handlers" toml:"handlers"` Servlets map[string]*Servlet `json:"servlets" yaml:"servlets" toml:"servlets"`
} }

View File

@ -2,5 +2,5 @@ package config
type GRPC struct { type GRPC struct {
Server Server
Pool Pool `json:"pool" yaml:"pool" toml:"pool"` Pool *Pool `json:"pool" yaml:"pool" toml:"pool"`
} }

View File

@ -1,6 +0,0 @@
package config
type Handler struct {
Entry string `json:"entry" yaml:"entry" toml:"entry"`
Socket Socket `json:"socket" yaml:"socket" toml:"socket"`
}

View File

@ -2,5 +2,5 @@ package config
type Redis struct { type Redis struct {
Server Server
Pool Pool `json:"pool" yaml:"pool" toml:"pool"` Pool *Pool `json:"pool" yaml:"pool" toml:"pool"`
} }

6
config/servlet.go Normal file
View File

@ -0,0 +1,6 @@
package config
type Servlet struct {
Entry string `json:"entry" yaml:"entry" toml:"entry"`
Socket *Socket `json:"socket" yaml:"socket" toml:"socket"`
}

View File

@ -1,4 +1,4 @@
package server package rpc
import ( import (
"context" "context"
@ -30,12 +30,13 @@ func (sh *RPCGatewaySocketHandlers) Handshake(ctx *fasthttp.RequestCtx) (id stri
// OnConnect invoked when client is connected // OnConnect invoked when client is connected
// If you override ths method, must call // If you override ths method, must call
func (sh *RPCGatewaySocketHandlers) OnConnect(soc *cwf.Socket) { func (sh *RPCGatewaySocketHandlers) OnConnect(soc cwf.Socket) cwf.Socket {
sh.SocketHandlers.OnConnect(soc) soc = sh.SocketHandlers.OnConnect(soc)
return newSocket(soc, "json")
} }
func (sh *RPCGatewaySocketHandlers) Handle(soc *cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) { func (sh *RPCGatewaySocketHandlers) Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) {
codec, err := sh.rpcServerHandler.GetCodec(sh.ContentType) codec, err := sh.rpcServerHandler.GetCodec(sh.ContentType)
if nil != err { if nil != err {
log.Printf("RPC Handle: %v", err) log.Printf("RPC Handle: %v", err)
@ -83,7 +84,7 @@ func (sh *RPCGatewaySocketHandlers) Handle(soc *cwf.Socket, stopChan <-chan stru
// OnDisconnect invoked when client is disconnected // OnDisconnect invoked when client is disconnected
// If you override ths method, must call // If you override ths method, must call
func (sh *RPCGatewaySocketHandlers) OnDisconnect(soc *cwf.Socket) { func (sh *RPCGatewaySocketHandlers) OnDisconnect(soc cwf.Socket) {
sh.SocketHandlers.OnDisconnect(soc) sh.SocketHandlers.OnDisconnect(soc)
} }

View File

@ -0,0 +1,28 @@
package rpc
import (
cwf "git.loafle.net/commons_go/websocket_fasthttp"
)
func newSocket(soc cwf.Socket, contentType string) Socket {
newSoc := &socket{
contentType: contentType,
}
newSoc.Socket = soc
return newSoc
}
type Socket interface {
cwf.Socket
GetContentType() string
}
type socket struct {
cwf.Socket
contentType string
}
func (s *socket) GetContentType() string {
return s.contentType
}

View File

@ -1 +1,101 @@
package server package server
import (
"context"
"log"
"github.com/valyala/fasthttp"
"git.loafle.net/commons_go/rpc/gateway"
"git.loafle.net/commons_go/server"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
oogw "git.loafle.net/overflow/overflow_gateway_websocket"
)
type SocketHandlers struct {
cwf.SocketHandlers
}
func (sh *SocketHandlers) Init() error {
if err := sh.SocketHandlers.Init(); nil != err {
return err
}
return nil
}
func (sh *SocketHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) {
return "", nil
}
// OnConnect invoked when client is connected
// If you override ths method, must call
func (sh *SocketHandlers) OnConnect(soc cwf.Socket) {
sh.SocketHandlers.OnConnect(soc)
}
func (sh *SocketHandlers) Handle(soc *cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) {
codec, err := sh.rpcServerHandler.GetCodec(sh.ContentType)
if nil != err {
log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{}
return
}
var socConn *cwf.SocketConn
ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, soc)
// conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
if socConn, err = soc.WaitRequest(); nil != err {
doneChan <- struct{}{}
return
}
// // "git.loafle.net/commons_go/websocket_fasthttp/websocket"
// switch socConn.MessageType {
// case websocket.TextMessage:
// case websocket.BinaryMessage:
// }
if err = gateway.Handle(ctx, sh.rpcServerHandler, codec, socConn, socConn); nil != err {
if server.IsClientDisconnect(err) {
doneChan <- struct{}{}
return
}
log.Printf("RPC: %v", err)
}
if err = socConn.Close(); nil != err {
doneChan <- struct{}{}
return
}
select {
case <-stopChan:
return
default:
}
}
}
// OnDisconnect invoked when client is disconnected
// If you override ths method, must call
func (sh *SocketHandlers) OnDisconnect(soc *cwf.Socket) {
sh.SocketHandlers.OnDisconnect(soc)
}
// Destroy invoked when server is stopped
// If you override ths method, must call
func (sh *SocketHandlers) Destroy() {
sh.SocketHandlers.Destroy()
}
func (sh *SocketHandlers) Validate() {
sh.SocketHandlers.Validate()
}

View File

@ -9,6 +9,7 @@ import (
type ServerHandler interface { type ServerHandler interface {
cwf.ServerHandler cwf.ServerHandler
RegisterServletHandler(servletHandler servlet.ServletHandler) RegisterServlet(entryPath string, servletHandler servlet.ServletHandler)
RegisterSubscriberHandler(subscriberHandler oos.SubscriberHandler) RegisterRPCGatewayServlet(entryPath string, servletHandler servlet.RPCGatewayServletHandler)
RegisterSubscriber(subscriberHandler oos.SubscriberHandler)
} }

View File

@ -3,6 +3,7 @@ package server
import ( import (
"fmt" "fmt"
"net" "net"
"time"
"git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/logging"
cwf "git.loafle.net/commons_go/websocket_fasthttp" cwf "git.loafle.net/commons_go/websocket_fasthttp"
@ -64,11 +65,32 @@ func (sh *ServerHandlers) OnStop() {
sh.ServerHandlers.OnStop() sh.ServerHandlers.OnStop()
} }
func (sh *ServerHandlers) RegisterServletHandler(servletHandler servlet.ServletHandler) { func (sh *ServerHandlers) RegisterServlet(entryPath string, servletHandler servlet.ServletHandler) {
sh.RegisterSocketHandler(servletHandler) cfg := config.Config.Servlets[entryPath]
if nil == cfg {
logging.Logger().Panic(fmt.Sprintf("Gateway Server: config of entry path[%s] is not exist", entryPath))
return
}
servletH := servletHandler.(*servlet.ServletHandlers)
servletH.MaxMessageSize = cfg.Socket.MaxMessageSize
servletH.WriteTimeout = cfg.Socket.WriteTimeout * time.Second
servletH.ReadTimeout = cfg.Socket.ReadTimeout * time.Second
servletH.PongTimeout = cfg.Socket.PongTimeout * time.Second
servletH.PingTimeout = cfg.Socket.PingTimeout * time.Second
servletH.PingPeriod = cfg.Socket.PingPeriod * time.Second
// servletH.BinaryMessage = cfg.Socket.BinaryMessage
sh.RegisterSocketHandler(entryPath, servletH)
} }
func (sh *ServerHandlers) RegisterSubscriberHandler(subscriberHandler oos.SubscriberHandler) { func (sh *ServerHandlers) RegisterRPCGatewayServlet(entryPath string, servletHandler servlet.RPCGatewayServletHandler) {
}
func (sh *ServerHandlers) RegisterSubscriber(subscriberHandler oos.SubscriberHandler) {
if nil == sh.subscribers { if nil == sh.subscribers {
sh.subscribers = make([]oos.SubscriberHandler, 0) sh.subscribers = make([]oos.SubscriberHandler, 0)
} }

View File

@ -0,0 +1,58 @@
package servlet
import (
cwf "git.loafle.net/commons_go/websocket_fasthttp"
"github.com/valyala/fasthttp"
)
type RPCGatewayServletHandler interface {
// Init invoked when server is stated
// If you override ths method, must call
//
// func (sh *SocketHandler) Init() error {
// if err := sh.SocketHandlers.Init(); nil != err {
// return err
// }
// ...
// return nil
// }
Init() error
// Handshake do handshake client and server
// id is identity of client socket. if id is "", disallow connection
Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader)
// OnConnect invoked when client is connected
// If you override ths method, must call
//
// func (sh *SocketHandler) OnConnect(soc *cwf.Socket) {
// ...
// newSoc := ....
// return sh.SocketHandlers.OnConnect(newSoc)
// }
OnConnect(soc cwf.Socket) cwf.Socket
Handle(soc cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{})
// OnDisconnect invoked when client is disconnected
// If you override ths method, must call
//
// func (sh *SocketHandler) OnDisconnect(soc *cwf.Socket) {
// ...
// sh.SocketHandlers.OnDisconnect(soc)
// }
OnDisconnect(soc cwf.Socket)
// Destroy invoked when server is stopped
// If you override ths method, must call
//
// func (sh *SocketHandler) Destroy() {
// ...
// sh.SocketHandlers.Destroy()
// }
Destroy()
// Validate is check handler value
// If you override ths method, must call
//
// func (sh *SocketHandlers) Validate() {
// sh.SocketHandlers.Validate()
// ...
// }
Validate()
}

View File

@ -0,0 +1,35 @@
package servlet
import (
cwf "git.loafle.net/commons_go/websocket_fasthttp"
"github.com/valyala/fasthttp"
)
type RPCGatewayServletHandlers struct {
}
func (sh *RPCGatewayServletHandlers) Init() error {
return nil
}
func (sh *RPCGatewayServletHandlers) Handshake(ctx *fasthttp.RequestCtx) (id string, extensionsHeader *fasthttp.ResponseHeader) {
return "", nil
}
func (sh *RPCGatewayServletHandlers) OnConnect(soc *cwf.Socket) {
}
func (sh *RPCGatewayServletHandlers) Handle(soc *cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) {
// no op
}
func (sh *RPCGatewayServletHandlers) OnDisconnect(soc *cwf.Socket) {
}
func (sh *RPCGatewayServletHandlers) Destroy() {
// no op
}
func (sh *RPCGatewayServletHandlers) Validate() {
}

View File

@ -1,5 +0,0 @@
package servlet
type RPCServletHandler interface {
ServletHandler
}

View File

@ -1,98 +0,0 @@
package servlet
import (
"context"
"log"
"git.loafle.net/commons_go/rpc/gateway"
"git.loafle.net/commons_go/server"
cwf "git.loafle.net/commons_go/websocket_fasthttp"
oogw "git.loafle.net/overflow/overflow_gateway_websocket"
)
type RPCServletHandlers struct {
ServletHandlers
RPCHandler
}
func (sh *RPCServletHandlers) Init() error {
if err := sh.ServletHandlers.Init(); nil != err {
return err
}
return nil
}
// OnConnect invoked when client is connected
// If you override ths method, must call
func (sh *RPCServletHandlers) OnConnect(soc *cwf.Socket) {
sh.ServletHandlers.OnConnect(soc)
}
func (sh *RPCServletHandlers) Handle(soc *cwf.Socket, stopChan <-chan struct{}, doneChan chan<- struct{}) {
codec, err := sh.rpcServerHandler.GetCodec(sh.ContentType)
if nil != err {
log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{}
return
}
var socConn *cwf.SocketConn
ctx := context.WithValue(context.Background(), oogw.ServletSocketKey, soc)
// conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
if socConn, err = soc.WaitRequest(); nil != err {
doneChan <- struct{}{}
return
}
// // "git.loafle.net/commons_go/websocket_fasthttp/websocket"
// switch socConn.MessageType {
// case websocket.TextMessage:
// case websocket.BinaryMessage:
// }
if err = gateway.Handle(ctx, sh.rpcServerHandler, codec, socConn, socConn); nil != err {
if server.IsClientDisconnect(err) {
doneChan <- struct{}{}
return
}
log.Printf("RPC: %v", err)
}
if err = socConn.Close(); nil != err {
doneChan <- struct{}{}
return
}
select {
case <-stopChan:
return
default:
}
}
}
// OnDisconnect invoked when client is disconnected
// If you override ths method, must call
func (sh *RPCServletHandlers) OnDisconnect(soc *cwf.Socket) {
sh.ServletHandlers.OnDisconnect(soc)
}
// Destroy invoked when server is stopped
// If you override ths method, must call
func (sh *ServletHandlers) Destroy() {
sh.SocketHandlers.Destroy()
}
func (sh *RPCServletHandlers) Validate() {
sh.ServletHandlers.Validate()
}

View File

@ -1,50 +1,46 @@
package servlet package servlet
import ( import (
"git.loafle.net/commons_go/logging"
cwf "git.loafle.net/commons_go/websocket_fasthttp" cwf "git.loafle.net/commons_go/websocket_fasthttp"
) )
type ServletHandlers struct { type ServletHandlers struct {
cwf.SocketHandlers cwf.SocketHandlers
// EntryPath is path of url (ex: /web)
EntryPath string
} }
func (sh *ServletHandlers) Init() error { // func (sh *ServletHandlers) Init() error {
if err := sh.SocketHandlers.Init(); nil != err { // if err := sh.SocketHandlers.Init(); nil != err {
return err // return err
} // }
return nil // return nil
} // }
// OnConnect invoked when client is connected // // OnConnect invoked when client is connected
// If you override ths method, must call // // If you override ths method, must call
func (sh *ServletHandlers) OnConnect(soc *cwf.Socket) { // func (sh *ServletHandlers) OnConnect(soc *cwf.Socket) {
sh.SocketHandlers.OnConnect(soc) // sh.SocketHandlers.OnConnect(soc)
} // }
// OnDisconnect invoked when client is disconnected // // OnDisconnect invoked when client is disconnected
// If you override ths method, must call // // If you override ths method, must call
func (sh *ServletHandlers) OnDisconnect(soc *cwf.Socket) { // func (sh *ServletHandlers) OnDisconnect(soc *cwf.Socket) {
sh.SocketHandlers.OnDisconnect(soc) // sh.SocketHandlers.OnDisconnect(soc)
} // }
// Destroy invoked when server is stopped // // Destroy invoked when server is stopped
// If you override ths method, must call // // If you override ths method, must call
func (sh *ServletHandlers) Destroy() { // func (sh *ServletHandlers) Destroy() {
sh.SocketHandlers.Destroy() // sh.SocketHandlers.Destroy()
} // }
func (sh *ServletHandlers) Validate() { // func (sh *ServletHandlers) Validate() {
sh.SocketHandlers.Validate() // sh.SocketHandlers.Validate()
if "" == sh.EntryPath { // if "" == sh.EntryPath {
logging.Logger().Panic("Geteway Server: The path of entry must be specified") // logging.Logger().Panic("Geteway Server: The path of entry must be specified")
} // }
} // }