diff --git a/client/client.go b/client/client.go index c826db6..65a3f66 100644 --- a/client/client.go +++ b/client/client.go @@ -10,13 +10,16 @@ import ( logging "git.loafle.net/commons/logging-go" "git.loafle.net/commons/rpc-go/protocol" "git.loafle.net/commons/rpc-go/registry" + csc "git.loafle.net/commons/server-go/client" ) var uint64Type = reflect.TypeOf(uint64(0)) type Client struct { + Connector csc.Connector Codec protocol.ClientCodec RPCInvoker registry.RPCInvoker + Name string stopChan chan struct{} stopWg sync.WaitGroup @@ -29,11 +32,19 @@ type Client struct { func (c *Client) Start(readChan <-chan []byte, writeChan chan<- []byte) error { if c.stopChan != nil { - return fmt.Errorf("Client: already running. Stop it before starting it again") + return fmt.Errorf("%s already running. Stop it before starting it again", c.logHeader()) + } + + if nil == c.Connector { + return fmt.Errorf("%s Connector is not valid", c.logHeader()) + } + + if err := c.Connector.Validate(); nil != err { + return err } if nil == c.Codec { - return fmt.Errorf("Client: Codec is not valid") + return fmt.Errorf("%s Codec is not valid", c.logHeader()) } c.stopChan = make(chan struct{}) @@ -46,7 +57,7 @@ func (c *Client) Start(readChan <-chan []byte, writeChan chan<- []byte) error { func (c *Client) Stop(ctx context.Context) error { if c.stopChan == nil { - return fmt.Errorf("Client: must be started before stopping it") + return fmt.Errorf("%s must be started before stopping it", c.logHeader()) } close(c.stopChan) c.stopWg.Wait() @@ -55,6 +66,10 @@ func (c *Client) Stop(ctx context.Context) error { return nil } +func (c *Client) logHeader() string { + return fmt.Sprintf("RPC Client[%s]:", c.Name) +} + func (c *Client) Send(method string, params ...interface{}) error { rs, err := c.internalSend(false, nil, method, params...) if nil != err { @@ -93,7 +108,7 @@ func (c *Client) CallTimeout(timeout time.Duration, result interface{}, method s return rs.clientError case <-t.C: rs.cancel() - return newError(method, params, fmt.Errorf("Timeout")) + return newError(method, params, fmt.Errorf("%s Timeout", c.logHeader())) } } @@ -119,12 +134,12 @@ func (c *Client) internalSend(hasResponse bool, result interface{}, method strin default: if !hasResponse { releaseRequestState(rs) - return nil, newError(method, params, fmt.Errorf("Request Queue overflow")) + return nil, newError(method, params, fmt.Errorf("%s Request Queue overflow", c.logHeader())) } select { case oldRS := <-c.requestQueueChan: if nil != oldRS.doneChan { - oldRS.setError(fmt.Errorf("Request Queue overflow")) + oldRS.setError(fmt.Errorf("%s Request Queue overflow", c.logHeader())) oldRS.done() } else { releaseRequestState(oldRS) @@ -136,7 +151,7 @@ func (c *Client) internalSend(hasResponse bool, result interface{}, method strin return rs, nil default: releaseRequestState(rs) - return nil, newError(method, params, fmt.Errorf("Request Queue overflow")) + return nil, newError(method, params, fmt.Errorf("%s Request Queue overflow", c.logHeader())) } } } @@ -210,7 +225,7 @@ LOOP: select { case writeChan <- message: default: - rs.setError(fmt.Errorf("Client: cannot send request")) + rs.setError(fmt.Errorf("%s cannot send request", c.logHeader())) rs.done() continue LOOP } @@ -250,7 +265,7 @@ LOOP: // notification notiCodec, err := resCodec.Notification() if nil != err { - logging.Logger().Warnf("Client: notification error %v", err) + logging.Logger().Warnf("%s notification error %v", c.logHeader(), err) continue LOOP } @@ -275,7 +290,7 @@ func (c *Client) handleResponse(resCodec protocol.ClientResponseCodec) { id := reflect.ValueOf(resCodec.ID()).Convert(uint64Type).Uint() _rs, ok := c.pendingRequests.Load(id) if !ok { - logging.Logger().Warnf("Client: unexpected ID=[%d] obtained from server", id) + logging.Logger().Warnf("%s unexpected ID=[%d] obtained from server", c.logHeader(), id) return } rs := _rs.(*requestState) @@ -294,12 +309,12 @@ func (c *Client) handleNotification(notiCodec protocol.ClientNotificationCodec) }() if nil == c.RPCInvoker { - logging.Logger().Warnf("Client: received notification method[%s] but RPC Invoker is not exist", notiCodec.Method()) + logging.Logger().Warnf("%s received notification method[%s] but RPC Invoker is not exist", c.logHeader(), notiCodec.Method()) return } _, err := c.RPCInvoker.Invoke(notiCodec) if nil != err { - logging.Logger().Errorf("Client: invoking of notification method[%s] has been failed %v", notiCodec.Method(), err) + logging.Logger().Errorf("%s invoking of notification method[%s] has been failed %v", notiCodec.Method(), c.logHeader(), err) } } diff --git a/glide.yaml b/glide.yaml index bd908b7..1df4534 100644 --- a/glide.yaml +++ b/glide.yaml @@ -4,3 +4,4 @@ import: subpackages: - encoding/json - reflect +- package: git.loafle.net/commons/server-go