From 6729b0465a29da4de18c46c41f8f57ccbf255e71 Mon Sep 17 00:00:00 2001 From: crusader Date: Wed, 2 May 2018 17:47:34 +0900 Subject: [PATCH] ing --- client/client-context.go | 19 +++++++ client/client-handler.go | 118 +++++++++++++++++++++++++++++++++++++++ client/client.go | 61 ++++++++++---------- const.go | 8 +++ 4 files changed, 175 insertions(+), 31 deletions(-) create mode 100644 client/client-context.go create mode 100644 client/client-handler.go create mode 100644 const.go diff --git a/client/client-context.go b/client/client-context.go new file mode 100644 index 0000000..3cb37db --- /dev/null +++ b/client/client-context.go @@ -0,0 +1,19 @@ +package client + +import ( + cuc "git.loafle.net/commons/util-go/context" +) + +type ClientCtx interface { + cuc.Context +} + +func NewClientCtx(parent cuc.Context) ClientCtx { + return &clientCtx{ + Context: cuc.NewContext(parent), + } +} + +type clientCtx struct { + cuc.Context +} diff --git a/client/client-handler.go b/client/client-handler.go new file mode 100644 index 0000000..810d7ef --- /dev/null +++ b/client/client-handler.go @@ -0,0 +1,118 @@ +package client + +import ( + "fmt" + "sync/atomic" + "time" + + rpc "git.loafle.net/commons/rpc-go" + "git.loafle.net/commons/rpc-go/protocol" + "git.loafle.net/commons/rpc-go/registry" + csc "git.loafle.net/commons/server-go/client" +) + +type ClientHandler interface { + GetName() string + GetConnector() csc.Connector + GetRPCCodec() protocol.ClientCodec + GetRPCInvoker() registry.RPCInvoker + GetPendingRequestCount() int + GetRequestTimeout() time.Duration + + ClientCtx() ClientCtx + + Init(clientCtx ClientCtx) error + OnStart(clientCtx ClientCtx) error + OnStop(clientCtx ClientCtx) + Destroy(clientCtx ClientCtx) + + Validate() error +} + +type ClientHandlers struct { + // Client name for sending in response headers. + // + // Default client name is used if left blank. + Name string `json:"name,omitempty"` + Connector csc.Connector `json:"-"` + RPCCodec protocol.ClientCodec `json:"-"` + RPCInvoker registry.RPCInvoker `json:"-"` + PendingRequestCount int `json:"pendingRequests,omitempty"` + RequestTimeout time.Duration `json:"requestTimeout,omitempty"` + + validated atomic.Value +} + +func (ch *ClientHandlers) ClientCtx() ClientCtx { + return NewClientCtx(nil) +} + +func (ch *ClientHandlers) Init(clientCtx ClientCtx) error { + return nil +} + +func (ch *ClientHandlers) OnStart(clientCtx ClientCtx) error { + return nil +} + +func (ch *ClientHandlers) OnStop(clientCtx ClientCtx) { + +} + +func (ch *ClientHandlers) Destroy(clientCtx ClientCtx) { + +} + +func (ch *ClientHandlers) GetName() string { + return ch.Name +} + +func (ch *ClientHandlers) GetConnector() csc.Connector { + return ch.Connector +} +func (ch *ClientHandlers) GetRPCCodec() protocol.ClientCodec { + return ch.RPCCodec +} +func (ch *ClientHandlers) GetRPCInvoker() registry.RPCInvoker { + return ch.RPCInvoker +} +func (ch *ClientHandlers) GetPendingRequestCount() int { + return ch.PendingRequestCount +} +func (ch *ClientHandlers) GetRequestTimeout() time.Duration { + return ch.RequestTimeout +} + +func (ch *ClientHandlers) Validate() error { + if nil != ch.validated.Load() { + return nil + } + ch.validated.Store(true) + + if "" == ch.Name { + ch.Name = "Client" + } + if 0 >= ch.PendingRequestCount { + ch.PendingRequestCount = rpc.DefaultPendingRequestCount + } + + if ch.RequestTimeout <= 0 { + ch.RequestTimeout = rpc.DefaultRequestTimeout + } else { + ch.RequestTimeout = ch.RequestTimeout * time.Second + } + + if nil == ch.RPCCodec { + return fmt.Errorf("RPCCodec is not valid") + } + + if nil == ch.Connector { + return fmt.Errorf("Connector is not valid") + } + + if err := ch.Connector.Validate(); nil != err { + return err + } + + return nil +} diff --git a/client/client.go b/client/client.go index a334273..a1429bc 100644 --- a/client/client.go +++ b/client/client.go @@ -9,20 +9,14 @@ import ( logging "git.loafle.net/commons/logging-go" "git.loafle.net/commons/rpc-go/protocol" - "git.loafle.net/commons/rpc-go/registry" - csc "git.loafle.net/commons/server-go/client" ) var uint64Type = reflect.TypeOf(uint64(0)) type Client struct { - Connector csc.Connector - Codec protocol.ClientCodec - RPCInvoker registry.RPCInvoker - Name string - PendingRequests int - RequestTimeout time.Duration + ClientHandler ClientHandler + ctx ClientCtx stopChan chan struct{} stopWg sync.WaitGroup @@ -37,32 +31,28 @@ func (c *Client) Start() error { return fmt.Errorf("%s already running. Stop it before starting it again", c.logHeader()) } - if nil == c.Connector { - return fmt.Errorf("%s Connector is not valid", c.logHeader()) + if nil == c.ClientHandler { + return fmt.Errorf("%s ClientHandler must be specified", c.logHeader()) + } + if err := c.ClientHandler.Validate(); nil != err { + return fmt.Errorf("%s validate error %v", c.logHeader(), err) } - if err := c.Connector.Validate(); nil != err { - return err + c.ctx = c.ClientHandler.ClientCtx() + if nil == c.ctx { + return fmt.Errorf("%s ServerCtx is nil", c.logHeader()) } - if nil == c.Codec { - return fmt.Errorf("%s Codec is not valid", c.logHeader()) + if err := c.ClientHandler.Init(c.ctx); nil != err { + return fmt.Errorf("%s Init error %v", c.logHeader(), err) } - if 0 >= c.PendingRequests { - c.PendingRequests = 1024 - } - - if 0 >= c.RequestTimeout { - c.RequestTimeout = 20 * time.Second - } - - readChan, writeChan, err := c.Connector.Connect() + readChan, writeChan, err := c.ClientHandler.GetConnector().Connect() if nil != err { return err } - c.requestQueueChan = make(chan *requestState, c.PendingRequests) + c.requestQueueChan = make(chan *requestState, c.ClientHandler.GetPendingRequestCount()) c.stopChan = make(chan struct{}) c.stopWg.Add(1) @@ -77,13 +67,16 @@ func (c *Client) Stop(ctx context.Context) error { } close(c.stopChan) c.stopWg.Wait() + + c.ClientHandler.Destroy(c.ctx) + c.stopChan = nil return nil } func (c *Client) logHeader() string { - return fmt.Sprintf("RPC Client[%s]:", c.Name) + return fmt.Sprintf("RPC Client[%s]:", c.ClientHandler.GetName()) } func (c *Client) Send(method string, params ...interface{}) error { @@ -93,7 +86,7 @@ func (c *Client) Send(method string, params ...interface{}) error { } func (c *Client) Call(result interface{}, method string, params ...interface{}) error { - return c.CallTimeout(c.RequestTimeout, result, method, params...) + return c.CallTimeout(c.ClientHandler.GetRequestTimeout(), result, method, params...) } func (c *Client) CallTimeout(timeout time.Duration, result interface{}, method string, params ...interface{}) error { @@ -168,14 +161,20 @@ func (c *Client) internalSend(hasResponse bool, result interface{}, method strin func (c *Client) handleClient(readChan <-chan []byte, writeChan chan<- []byte) { defer func() { - if err := c.Connector.Disconnect(); nil != err { + if err := c.ClientHandler.GetConnector().Disconnect(); nil != err { logging.Logger().Warn(err) } + c.ClientHandler.OnStop(c.ctx) logging.Logger().Infof("%s Stopped", c.logHeader()) c.stopWg.Done() }() + if err := c.ClientHandler.OnStart(c.ctx); nil != err { + logging.Logger().Error(err) + return + } + stopChan := make(chan struct{}) sendDoneChan := make(chan error) receiveDoneChan := make(chan error) @@ -232,7 +231,7 @@ LOOP: if 0 < rs.id { id = rs.id } - message, err = c.Codec.NewRequest(rs.method, rs.params, id) + message, err = c.ClientHandler.GetRPCCodec().NewRequest(rs.method, rs.params, id) if nil != err { rs.setError(err) rs.done() @@ -274,7 +273,7 @@ LOOP: if !ok { return } - resCodec, err := c.Codec.NewResponse(message) + resCodec, err := c.ClientHandler.GetRPCCodec().NewResponse(message) if nil != err { logging.Logger().Debug(err) continue LOOP @@ -320,12 +319,12 @@ func (c *Client) handleResponse(resCodec protocol.ClientResponseCodec) { } func (c *Client) handleNotification(notiCodec protocol.ClientNotificationCodec) { - if nil == c.RPCInvoker { + if nil == c.ClientHandler.GetRPCInvoker() { logging.Logger().Warnf("%s received notification method[%s] but RPC Invoker is not exist", c.logHeader(), notiCodec.Method()) return } - _, err := c.RPCInvoker.Invoke(notiCodec) + _, err := c.ClientHandler.GetRPCInvoker().Invoke(notiCodec) if nil != err { logging.Logger().Errorf("%s invoking of notification method[%s] has been failed %v", c.logHeader(), notiCodec.Method(), err) } diff --git a/const.go b/const.go new file mode 100644 index 0000000..c35b8ac --- /dev/null +++ b/const.go @@ -0,0 +1,8 @@ +package rpc + +import "time" + +const ( + DefaultPendingRequestCount = 1024 + DefaultRequestTimeout = 20 * time.Second +)