diff --git a/config.json b/config.json index 0b2a307..75804d7 100644 --- a/config.json +++ b/config.json @@ -1,6 +1,6 @@ { "server": { - "addr": ":80", + "addr": ":18081", "tls": false }, "websocket": { diff --git a/debug b/debug index 0ddcd00..78fced1 100755 Binary files a/debug and b/debug differ diff --git a/main.go b/main.go index b515dfc..a745567 100644 --- a/main.go +++ b/main.go @@ -7,7 +7,6 @@ import ( "path/filepath" "git.loafle.net/overflow/overflow_service_websocket/config" - "git.loafle.net/overflow/overflow_service_websocket/protocol/jsonrpc" "git.loafle.net/overflow/overflow_service_websocket/server" ) @@ -46,7 +45,7 @@ func main() { // Config TLS ws := server.New(sConfig) - ws.RegistProtocol("jsonrpc", jsonrpc.NewHandler()) + ws.OnConnection(func(c server.Client) { log.Println("Client have been connected") c.OnDisconnect(func(c server.Client) { diff --git a/protocol/hadler.go b/protocol/hadler.go deleted file mode 100644 index ac516b2..0000000 --- a/protocol/hadler.go +++ /dev/null @@ -1,35 +0,0 @@ -package protocol - -import "errors" - -type ErrorCode int - -const ( - E_PARSE ErrorCode = -32700 - E_INVALID_REQ ErrorCode = -32600 - E_NOT_FOUND_METHOD ErrorCode = -32601 - E_INVALID_PARAMS ErrorCode = -32602 - E_INTERNAL ErrorCode = -32603 - E_SERVER ErrorCode = -32000 -) - -var ErrNullResult = errors.New("result is null") - -type Error struct { - Code ErrorCode `json:"code"` - Message string `json:"message"` - Data interface{} `json:"data,omitempty"` -} - -type Handler interface { - Handle([]byte) ([]byte, *Error) -} - -func NewError(code ErrorCode, err error, data interface{}) *Error { - e := &Error{ - Code: code, - Message: err.Error(), - Data: data, - } - return e -} diff --git a/protocol/jsonrpc/rpc.go b/protocol/jsonrpc/rpc.go deleted file mode 100644 index b0b9a60..0000000 --- a/protocol/jsonrpc/rpc.go +++ /dev/null @@ -1,94 +0,0 @@ -package jsonrpc - -import ( - "encoding/json" - "log" - "strings" - - "golang.org/x/net/context" - "google.golang.org/grpc" - - serverGrpc "git.loafle.net/overflow/overflow_api_server/golang" - "git.loafle.net/overflow/overflow_service_websocket/config" - "git.loafle.net/overflow/overflow_service_websocket/pool" - grpcPool "git.loafle.net/overflow/overflow_service_websocket/pool/grpc" - "git.loafle.net/overflow/overflow_service_websocket/protocol" -) - -type Request struct { - Method string `json:"method"` - Params []string `json:"params,omitempty"` -} - -type Response struct { - Result string `json:"result,omitempty"` -} - -// NewHandler returns a new JSON RPC handler. -func NewHandler() protocol.Handler { - c := config.GetConfig().GRpc - - addr := c.Addr - - servicePool, err := grpcPool.New(1, 5, - func(conn *grpc.ClientConn) (interface{}, error) { - return serverGrpc.NewOverflowApiServerClient(conn), nil - }, - func() (*grpc.ClientConn, error) { - return grpc.Dial(addr, grpc.WithInsecure()) - }, - ) - if nil != err { - log.Fatal(err) - return nil - } - h := &handler{ - pool: servicePool, - } - - return h -} - -// Codec creates a CodecRequest to process each request. -type handler struct { - pool pool.Pool -} - -// NewRequest returns a CodecRequest. -func (h *handler) Handle(data []byte) ([]byte, *protocol.Error) { - req := Request{} - err := json.Unmarshal(data, &req) - if nil != err { - return nil, protocol.NewError(protocol.E_PARSE, err, nil) - } - - parts := strings.Split(req.Method, ".") - - si := &serverGrpc.ServerInput{ - Target: parts[0], - Method: parts[1], - Params: req.Params, - } - - c, err := h.pool.Get() - if nil != err { - return nil, protocol.NewError(protocol.E_INTERNAL, err, nil) - } - defer h.pool.Put(c) - client := c.(serverGrpc.OverflowApiServerClient) - out, err := client.Exec(context.Background(), si) - if err != nil { - return nil, protocol.NewError(protocol.E_SERVER, err, err) - } - - res := &Response{ - Result: out.Result, - } - - r, err := json.Marshal(res) - if err != nil { - return nil, protocol.NewError(protocol.E_INTERNAL, err, nil) - } - - return r, nil -} diff --git a/server/client.go b/server/client.go index f7668c5..9981207 100644 --- a/server/client.go +++ b/server/client.go @@ -1,15 +1,17 @@ package server import ( + "context" "encoding/json" "fmt" "io" "log" "net/http" + "strings" "sync" "time" - "git.loafle.net/overflow/overflow_service_websocket/protocol" + serverGrpc "git.loafle.net/overflow/overflow_api_server/golang" "github.com/gorilla/websocket" ) @@ -168,17 +170,26 @@ func (c *client) onMessageReceived(messageType int, r io.Reader) { log.Println(err) } - h := c.server.ProtocolHandler(req.Protocol) - if nil == h { + parts := strings.Split(req.Method, ".") + si := &serverGrpc.ServerInput{ + Target: parts[0], + Method: parts[1], + Params: req.Params, } - hr, perr := h.Handle(*req.Body) - if perr != nil { - c.writeError(req, perr) + grpcPool, err := c.server.GetGRPCPool().Get() + if nil != err { + c.writeError(req, NewError(E_INTERNAL, err, nil)) + } + defer c.server.GetGRPCPool().Put(grpcPool) + grpcClient := grpcPool.(serverGrpc.OverflowApiServerClient) + out, err := grpcClient.Exec(context.Background(), si) + if err != nil { + c.writeError(req, NewError(E_SERVER, err, err)) } - c.writeResult(req, hr) + c.writeResult(req, out.Result) } func (c *client) fireError(err error) { @@ -187,7 +198,7 @@ func (c *client) fireError(err error) { } } -func (c *client) writeResult(r *Request, data []byte) { +func (c *client) writeResult(r *Request, result string) { c.writeMTX.Lock() if writeTimeout := c.server.GetOptions().WriteTimeout; writeTimeout > 0 { // set the write deadline based on the configuration @@ -195,28 +206,27 @@ func (c *client) writeResult(r *Request, data []byte) { log.Println(fmt.Errorf("%v", err)) } - raw := json.RawMessage(data) - res := &Response{ Protocol: r.Protocol, ID: r.ID, - Body: &raw, + Result: &result, Error: nil, } jRes, err := json.Marshal(res) if nil != err { + log.Println(fmt.Errorf("%v", err)) } err = c.conn.WriteMessage(c.messageType, jRes) c.writeMTX.Unlock() if nil != err { - c.Disconnect() + _ = c.Disconnect() } } -func (c *client) writeError(r *Request, perr *protocol.Error) { +func (c *client) writeError(r *Request, perr *Error) { c.writeMTX.Lock() if writeTimeout := c.server.GetOptions().WriteTimeout; writeTimeout > 0 { // set the write deadline based on the configuration @@ -227,18 +237,19 @@ func (c *client) writeError(r *Request, perr *protocol.Error) { res := &Response{ Protocol: r.Protocol, ID: r.ID, - Body: nil, + Result: nil, Error: perr, } jRes, err := json.Marshal(res) if nil != err { + log.Println(fmt.Errorf("%v", err)) } err = c.conn.WriteMessage(c.messageType, jRes) c.writeMTX.Unlock() if nil != err { - c.Disconnect() + _ = c.Disconnect() } } diff --git a/server/protocol.go b/server/protocol.go index b6fead6..24217c6 100644 --- a/server/protocol.go +++ b/server/protocol.go @@ -2,21 +2,49 @@ package server import ( "encoding/json" - - "git.loafle.net/overflow/overflow_service_websocket/protocol" + "errors" ) +type ErrorCode int + +const ( + E_PARSE ErrorCode = -32700 + E_INVALID_REQ ErrorCode = -32600 + E_NOT_FOUND_METHOD ErrorCode = -32601 + E_INVALID_PARAMS ErrorCode = -32602 + E_INTERNAL ErrorCode = -32603 + E_SERVER ErrorCode = -32000 +) + +var ErrNullResult = errors.New("result is null") + +type Error struct { + Code ErrorCode `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` +} + // Request is protocol which Websocket Request type Request struct { Protocol string `json:"protocol"` - ID *json.RawMessage `json:"id"` - Body *json.RawMessage `json:"body"` + Method string `json:"method"` + Params []string `json:"params,omitempty"` + ID *json.RawMessage `json:"id,omitempty"` } // Response is protocol which Websocket Response type Response struct { Protocol string `json:"protocol"` - ID *json.RawMessage `json:"id"` - Error *protocol.Error `json:"error,omitempty"` - Body *json.RawMessage `json:"body"` + Result *string `json:"result,omitempty"` + Error *Error `json:"error,omitempty"` + ID *json.RawMessage `json:"id,omitempty"` +} + +func NewError(code ErrorCode, err error, data interface{}) *Error { + e := &Error{ + Code: code, + Message: err.Error(), + Data: data, + } + return e } diff --git a/server/server.go b/server/server.go index 5215afa..5f53610 100644 --- a/server/server.go +++ b/server/server.go @@ -4,11 +4,14 @@ import ( "fmt" "log" "net/http" - "strings" "sync" - "git.loafle.net/overflow/overflow_service_websocket/protocol" + serverGrpc "git.loafle.net/overflow/overflow_api_server/golang" + "git.loafle.net/overflow/overflow_service_websocket/config" + "git.loafle.net/overflow/overflow_service_websocket/pool" + grpcPool "git.loafle.net/overflow/overflow_service_websocket/pool/grpc" "github.com/gorilla/websocket" + "google.golang.org/grpc" ) type ( @@ -20,8 +23,7 @@ type ( // listens on the config's port, the critical part is the event OnConnection type Server interface { GetOptions() *Options - RegistProtocol(protocol string, h protocol.Handler) - ProtocolHandler(protocol string) protocol.Handler + GetGRPCPool() pool.Pool HTTPHandler() http.Handler HandleConnection(*http.Request, Connection) OnConnection(cb OnConnectionFunc) @@ -35,13 +37,9 @@ type server struct { clients map[string]Client clientMTX sync.Mutex onConnectionListeners []OnConnectionFunc - protocols map[string]protocol.Handler + grpcPool pool.Pool } -var _ Server = &server{} - -var defaultServer = newServer(nil) - // server implementation // New creates a websocket server and returns it @@ -55,7 +53,6 @@ func newServer(o *Options) Server { s := &server{ clients: make(map[string]Client, 100), onConnectionListeners: make([]OnConnectionFunc, 0), - protocols: make(map[string]protocol.Handler, 0), } if nil == o { @@ -65,37 +62,31 @@ func newServer(o *Options) Server { o.Validate() s.options = o + + pool, err := grpcPool.New(1, 5, + func(conn *grpc.ClientConn) (interface{}, error) { + return serverGrpc.NewOverflowApiServerClient(conn), nil + }, + func() (*grpc.ClientConn, error) { + return grpc.Dial(config.GetConfig().GRpc.Addr, grpc.WithInsecure()) + }, + ) + if nil != err { + log.Fatal(err) + return nil + } + + s.grpcPool = pool + return s } -func RegistProtocol(protocol string, h protocol.Handler) { - defaultServer.RegistProtocol(protocol, h) -} - -func (s *server) RegistProtocol(protocol string, h protocol.Handler) { - s.protocols[strings.ToLower(protocol)] = h -} - -func ProtocolHandler(protocol string) protocol.Handler { - return defaultServer.ProtocolHandler(protocol) -} - -func (s *server) ProtocolHandler(protocol string) protocol.Handler { - return s.protocols[protocol] -} - -// GetOptions is function that get option values -func GetOptions() *Options { - return defaultServer.GetOptions() -} - func (s *server) GetOptions() *Options { return s.options } -// Handler is the function that used on http request -func HTTPHandler() http.Handler { - return defaultServer.HTTPHandler() +func (s *server) GetGRPCPool() pool.Pool { + return s.grpcPool } func (s *server) HTTPHandler() http.Handler { @@ -118,10 +109,6 @@ func (s *server) HTTPHandler() http.Handler { }) } -func HandleConnection(r *http.Request, conn Connection) { - defaultServer.HandleConnection(r, conn) -} - func (s *server) HandleConnection(r *http.Request, conn Connection) { clientID := s.options.IDGenerator(r) c := newClient(s, r, conn, clientID) @@ -138,39 +125,19 @@ func (s *server) HandleConnection(r *http.Request, conn Connection) { c.initialize() } -// OnConnection is function that add the callback when client is connected to default Server -func OnConnection(cb OnConnectionFunc) { - defaultServer.OnConnection(cb) -} - func (s *server) OnConnection(cb OnConnectionFunc) { s.onConnectionListeners = append(s.onConnectionListeners, cb) } -// IsConnected is function that check client is connect -func IsConnected(clientID string) bool { - return defaultServer.IsConnected(clientID) -} - func (s *server) IsConnected(clientID string) bool { soc := s.clients[clientID] return soc != nil } -// GetClient is function that return client instance -func GetClient(clientID string) Client { - return defaultServer.GetClient(clientID) -} - func (s *server) GetClient(clientID string) Client { return s.clients[clientID] } -// Disconnect is function that disconnect a client -func Disconnect(clientID string) error { - return defaultServer.Disconnect(clientID) -} - func (s *server) Disconnect(clientID string) error { c := s.clients[clientID]