This commit is contained in:
crusader 2018-04-12 10:45:32 +09:00
parent 46b23c5217
commit 903c790624
2 changed files with 28 additions and 12 deletions

View File

@ -10,13 +10,16 @@ import (
logging "git.loafle.net/commons/logging-go" logging "git.loafle.net/commons/logging-go"
"git.loafle.net/commons/rpc-go/protocol" "git.loafle.net/commons/rpc-go/protocol"
"git.loafle.net/commons/rpc-go/registry" "git.loafle.net/commons/rpc-go/registry"
csc "git.loafle.net/commons/server-go/client"
) )
var uint64Type = reflect.TypeOf(uint64(0)) var uint64Type = reflect.TypeOf(uint64(0))
type Client struct { type Client struct {
Connector csc.Connector
Codec protocol.ClientCodec Codec protocol.ClientCodec
RPCInvoker registry.RPCInvoker RPCInvoker registry.RPCInvoker
Name string
stopChan chan struct{} stopChan chan struct{}
stopWg sync.WaitGroup stopWg sync.WaitGroup
@ -29,11 +32,19 @@ type Client struct {
func (c *Client) Start(readChan <-chan []byte, writeChan chan<- []byte) error { func (c *Client) Start(readChan <-chan []byte, writeChan chan<- []byte) error {
if c.stopChan != nil { if c.stopChan != nil {
return fmt.Errorf("Client: already running. Stop it before starting it again") return fmt.Errorf("%s already running. Stop it before starting it again", c.logHeader())
}
if nil == c.Connector {
return fmt.Errorf("%s Connector is not valid", c.logHeader())
}
if err := c.Connector.Validate(); nil != err {
return err
} }
if nil == c.Codec { if nil == c.Codec {
return fmt.Errorf("Client: Codec is not valid") return fmt.Errorf("%s Codec is not valid", c.logHeader())
} }
c.stopChan = make(chan struct{}) c.stopChan = make(chan struct{})
@ -46,7 +57,7 @@ func (c *Client) Start(readChan <-chan []byte, writeChan chan<- []byte) error {
func (c *Client) Stop(ctx context.Context) error { func (c *Client) Stop(ctx context.Context) error {
if c.stopChan == nil { if c.stopChan == nil {
return fmt.Errorf("Client: must be started before stopping it") return fmt.Errorf("%s must be started before stopping it", c.logHeader())
} }
close(c.stopChan) close(c.stopChan)
c.stopWg.Wait() c.stopWg.Wait()
@ -55,6 +66,10 @@ func (c *Client) Stop(ctx context.Context) error {
return nil return nil
} }
func (c *Client) logHeader() string {
return fmt.Sprintf("RPC Client[%s]:", c.Name)
}
func (c *Client) Send(method string, params ...interface{}) error { func (c *Client) Send(method string, params ...interface{}) error {
rs, err := c.internalSend(false, nil, method, params...) rs, err := c.internalSend(false, nil, method, params...)
if nil != err { if nil != err {
@ -93,7 +108,7 @@ func (c *Client) CallTimeout(timeout time.Duration, result interface{}, method s
return rs.clientError return rs.clientError
case <-t.C: case <-t.C:
rs.cancel() rs.cancel()
return newError(method, params, fmt.Errorf("Timeout")) return newError(method, params, fmt.Errorf("%s Timeout", c.logHeader()))
} }
} }
@ -119,12 +134,12 @@ func (c *Client) internalSend(hasResponse bool, result interface{}, method strin
default: default:
if !hasResponse { if !hasResponse {
releaseRequestState(rs) releaseRequestState(rs)
return nil, newError(method, params, fmt.Errorf("Request Queue overflow")) return nil, newError(method, params, fmt.Errorf("%s Request Queue overflow", c.logHeader()))
} }
select { select {
case oldRS := <-c.requestQueueChan: case oldRS := <-c.requestQueueChan:
if nil != oldRS.doneChan { if nil != oldRS.doneChan {
oldRS.setError(fmt.Errorf("Request Queue overflow")) oldRS.setError(fmt.Errorf("%s Request Queue overflow", c.logHeader()))
oldRS.done() oldRS.done()
} else { } else {
releaseRequestState(oldRS) releaseRequestState(oldRS)
@ -136,7 +151,7 @@ func (c *Client) internalSend(hasResponse bool, result interface{}, method strin
return rs, nil return rs, nil
default: default:
releaseRequestState(rs) releaseRequestState(rs)
return nil, newError(method, params, fmt.Errorf("Request Queue overflow")) return nil, newError(method, params, fmt.Errorf("%s Request Queue overflow", c.logHeader()))
} }
} }
} }
@ -210,7 +225,7 @@ LOOP:
select { select {
case writeChan <- message: case writeChan <- message:
default: default:
rs.setError(fmt.Errorf("Client: cannot send request")) rs.setError(fmt.Errorf("%s cannot send request", c.logHeader()))
rs.done() rs.done()
continue LOOP continue LOOP
} }
@ -250,7 +265,7 @@ LOOP:
// notification // notification
notiCodec, err := resCodec.Notification() notiCodec, err := resCodec.Notification()
if nil != err { if nil != err {
logging.Logger().Warnf("Client: notification error %v", err) logging.Logger().Warnf("%s notification error %v", c.logHeader(), err)
continue LOOP continue LOOP
} }
@ -275,7 +290,7 @@ func (c *Client) handleResponse(resCodec protocol.ClientResponseCodec) {
id := reflect.ValueOf(resCodec.ID()).Convert(uint64Type).Uint() id := reflect.ValueOf(resCodec.ID()).Convert(uint64Type).Uint()
_rs, ok := c.pendingRequests.Load(id) _rs, ok := c.pendingRequests.Load(id)
if !ok { if !ok {
logging.Logger().Warnf("Client: unexpected ID=[%d] obtained from server", id) logging.Logger().Warnf("%s unexpected ID=[%d] obtained from server", c.logHeader(), id)
return return
} }
rs := _rs.(*requestState) rs := _rs.(*requestState)
@ -294,12 +309,12 @@ func (c *Client) handleNotification(notiCodec protocol.ClientNotificationCodec)
}() }()
if nil == c.RPCInvoker { if nil == c.RPCInvoker {
logging.Logger().Warnf("Client: received notification method[%s] but RPC Invoker is not exist", notiCodec.Method()) logging.Logger().Warnf("%s received notification method[%s] but RPC Invoker is not exist", c.logHeader(), notiCodec.Method())
return return
} }
_, err := c.RPCInvoker.Invoke(notiCodec) _, err := c.RPCInvoker.Invoke(notiCodec)
if nil != err { if nil != err {
logging.Logger().Errorf("Client: invoking of notification method[%s] has been failed %v", notiCodec.Method(), err) logging.Logger().Errorf("%s invoking of notification method[%s] has been failed %v", notiCodec.Method(), c.logHeader(), err)
} }
} }

View File

@ -4,3 +4,4 @@ import:
subpackages: subpackages:
- encoding/json - encoding/json
- reflect - reflect
- package: git.loafle.net/commons/server-go