rpc-go/client/client.go

333 lines
7.1 KiB
Go
Raw Normal View History

2018-04-11 14:16:41 +00:00
package client
import (
"context"
"fmt"
"reflect"
"sync"
"time"
2018-05-11 04:20:24 +00:00
"git.loafle.net/commons/logging-go"
2018-04-11 14:16:41 +00:00
"git.loafle.net/commons/rpc-go/protocol"
2018-05-11 04:20:24 +00:00
csc "git.loafle.net/commons/server-go/client"
2018-04-11 14:16:41 +00:00
)
var uint64Type = reflect.TypeOf(uint64(0))
type Client struct {
2018-05-02 08:47:34 +00:00
ClientHandler ClientHandler
2018-04-11 14:16:41 +00:00
2018-05-11 04:20:24 +00:00
ctx csc.ClientCtx
2018-04-11 14:16:41 +00:00
stopChan chan struct{}
stopWg sync.WaitGroup
requestID uint64
requestQueueChan chan *requestState
pendingRequests sync.Map
}
2018-04-12 01:55:29 +00:00
func (c *Client) Start() error {
2018-04-11 14:16:41 +00:00
if c.stopChan != nil {
2018-04-12 01:45:32 +00:00
return fmt.Errorf("%s already running. Stop it before starting it again", c.logHeader())
}
2018-05-02 08:47:34 +00:00
if nil == c.ClientHandler {
return fmt.Errorf("%s ClientHandler must be specified", c.logHeader())
2018-04-12 01:45:32 +00:00
}
2018-05-02 08:47:34 +00:00
if err := c.ClientHandler.Validate(); nil != err {
return fmt.Errorf("%s validate error %v", c.logHeader(), err)
2018-04-11 14:16:41 +00:00
}
2018-05-02 08:47:34 +00:00
c.ctx = c.ClientHandler.ClientCtx()
if nil == c.ctx {
return fmt.Errorf("%s ServerCtx is nil", c.logHeader())
2018-04-11 14:16:41 +00:00
}
2018-05-02 08:47:34 +00:00
if err := c.ClientHandler.Init(c.ctx); nil != err {
return fmt.Errorf("%s Init error %v", c.logHeader(), err)
2018-04-26 12:22:39 +00:00
}
2018-04-19 11:58:24 +00:00
2018-05-02 08:47:34 +00:00
readChan, writeChan, err := c.ClientHandler.GetConnector().Connect()
2018-04-12 01:55:29 +00:00
if nil != err {
return err
}
2018-05-02 08:47:34 +00:00
c.requestQueueChan = make(chan *requestState, c.ClientHandler.GetPendingRequestCount())
2018-04-11 14:16:41 +00:00
c.stopChan = make(chan struct{})
c.stopWg.Add(1)
go c.handleClient(readChan, writeChan)
return nil
}
func (c *Client) Stop(ctx context.Context) error {
if c.stopChan == nil {
2018-04-12 01:45:32 +00:00
return fmt.Errorf("%s must be started before stopping it", c.logHeader())
2018-04-11 14:16:41 +00:00
}
close(c.stopChan)
c.stopWg.Wait()
2018-05-02 08:47:34 +00:00
c.ClientHandler.Destroy(c.ctx)
2018-04-11 14:16:41 +00:00
c.stopChan = nil
return nil
}
2018-04-12 01:45:32 +00:00
func (c *Client) logHeader() string {
2018-05-02 08:47:34 +00:00
return fmt.Sprintf("RPC Client[%s]:", c.ClientHandler.GetName())
2018-04-12 01:45:32 +00:00
}
2018-04-11 14:16:41 +00:00
func (c *Client) Send(method string, params ...interface{}) error {
2018-04-28 14:58:03 +00:00
_, err := c.internalSend(false, nil, method, params...)
2018-04-11 14:16:41 +00:00
2018-04-28 14:58:03 +00:00
return err
2018-04-11 14:16:41 +00:00
}
func (c *Client) Call(result interface{}, method string, params ...interface{}) error {
2018-05-02 08:47:34 +00:00
return c.CallTimeout(c.ClientHandler.GetRequestTimeout(), result, method, params...)
2018-04-11 14:16:41 +00:00
}
func (c *Client) CallTimeout(timeout time.Duration, result interface{}, method string, params ...interface{}) error {
rs, err := c.internalSend(true, result, method, params...)
if nil != err {
return err
}
t := retainTimer(timeout)
defer func() {
releaseRequestState(rs)
releaseTimer(t)
}()
select {
case <-rs.doneChan:
2018-04-29 10:26:17 +00:00
if nil != rs.clientError {
return rs.clientError
}
2018-04-11 14:16:41 +00:00
result = rs.result
2018-04-29 10:26:17 +00:00
return nil
2018-04-11 14:16:41 +00:00
case <-t.C:
rs.cancel()
2018-04-12 01:45:32 +00:00
return newError(method, params, fmt.Errorf("%s Timeout", c.logHeader()))
2018-04-11 14:16:41 +00:00
}
}
func (c *Client) getRequestID() uint64 {
c.requestID++
return c.requestID
}
func (c *Client) internalSend(hasResponse bool, result interface{}, method string, params ...interface{}) (*requestState, error) {
rs := retainRequestState()
rs.method = method
rs.params = params
if hasResponse {
rs.id = c.getRequestID()
rs.result = result
2018-04-28 14:58:03 +00:00
rs.doneChan = make(chan *requestState, 1)
2018-04-11 14:16:41 +00:00
}
select {
case c.requestQueueChan <- rs:
return rs, nil
default:
if !hasResponse {
releaseRequestState(rs)
2018-04-12 01:45:32 +00:00
return nil, newError(method, params, fmt.Errorf("%s Request Queue overflow", c.logHeader()))
2018-04-11 14:16:41 +00:00
}
select {
case oldRS := <-c.requestQueueChan:
if nil != oldRS.doneChan {
2018-04-12 01:45:32 +00:00
oldRS.setError(fmt.Errorf("%s Request Queue overflow", c.logHeader()))
2018-04-11 14:16:41 +00:00
oldRS.done()
} else {
releaseRequestState(oldRS)
}
default:
}
select {
case c.requestQueueChan <- rs:
return rs, nil
default:
releaseRequestState(rs)
2018-04-12 01:45:32 +00:00
return nil, newError(method, params, fmt.Errorf("%s Request Queue overflow", c.logHeader()))
2018-04-11 14:16:41 +00:00
}
}
}
func (c *Client) handleClient(readChan <-chan []byte, writeChan chan<- []byte) {
defer func() {
2018-04-12 01:55:29 +00:00
2018-05-02 08:47:34 +00:00
if err := c.ClientHandler.GetConnector().Disconnect(); nil != err {
2018-04-12 01:55:29 +00:00
logging.Logger().Warn(err)
}
2018-05-02 08:47:34 +00:00
c.ClientHandler.OnStop(c.ctx)
2018-04-12 01:55:29 +00:00
logging.Logger().Infof("%s Stopped", c.logHeader())
2018-04-11 14:16:41 +00:00
c.stopWg.Done()
}()
2018-05-02 08:47:34 +00:00
if err := c.ClientHandler.OnStart(c.ctx); nil != err {
logging.Logger().Error(err)
return
}
2018-04-11 14:16:41 +00:00
stopChan := make(chan struct{})
sendDoneChan := make(chan error)
receiveDoneChan := make(chan error)
go c.handleSend(stopChan, sendDoneChan, writeChan)
go c.handleReceive(stopChan, receiveDoneChan, readChan)
2018-04-12 01:55:29 +00:00
logging.Logger().Infof("%s Started", c.logHeader())
2018-04-11 14:16:41 +00:00
select {
case <-sendDoneChan:
close(stopChan)
<-receiveDoneChan
case <-receiveDoneChan:
close(stopChan)
<-sendDoneChan
case <-c.stopChan:
close(stopChan)
<-sendDoneChan
<-receiveDoneChan
}
}
func (c *Client) handleSend(stopChan <-chan struct{}, doneChan chan<- error, writeChan chan<- []byte) {
var (
rs *requestState
id interface{}
message []byte
err error
ok bool
)
defer func() {
doneChan <- err
}()
LOOP:
for {
select {
case rs, ok = <-c.requestQueueChan:
if !ok {
return
}
if rs.isCanceled() {
if nil != rs.doneChan {
rs.done()
} else {
releaseRequestState(rs)
}
continue LOOP
}
id = nil
if 0 < rs.id {
id = rs.id
}
2018-05-02 08:47:34 +00:00
message, err = c.ClientHandler.GetRPCCodec().NewRequest(rs.method, rs.params, id)
2018-04-11 14:16:41 +00:00
if nil != err {
rs.setError(err)
rs.done()
continue LOOP
}
select {
case writeChan <- message:
default:
2018-04-12 01:45:32 +00:00
rs.setError(fmt.Errorf("%s cannot send request", c.logHeader()))
2018-04-11 14:16:41 +00:00
rs.done()
continue LOOP
}
if 0 < rs.id {
c.pendingRequests.Store(rs.id, rs)
}
2018-04-26 12:22:39 +00:00
case <-stopChan:
2018-04-11 14:16:41 +00:00
return
}
}
}
func (c *Client) handleReceive(stopChan <-chan struct{}, doneChan chan<- error, readChan <-chan []byte) {
var (
message []byte
err error
ok bool
)
defer func() {
doneChan <- err
}()
LOOP:
for {
select {
case message, ok = <-readChan:
if !ok {
return
}
2018-05-02 08:47:34 +00:00
resCodec, err := c.ClientHandler.GetRPCCodec().NewResponse(message)
2018-04-11 14:16:41 +00:00
if nil != err {
2018-04-13 11:24:22 +00:00
logging.Logger().Debug(err)
2018-04-11 14:16:41 +00:00
continue LOOP
}
if nil == resCodec.ID() {
// notification
notiCodec, err := resCodec.Notification()
if nil != err {
2018-04-12 01:45:32 +00:00
logging.Logger().Warnf("%s notification error %v", c.logHeader(), err)
2018-04-11 14:16:41 +00:00
continue LOOP
}
go c.handleNotification(notiCodec)
} else {
// response
go c.handleResponse(resCodec)
}
case <-stopChan:
return
}
}
}
func (c *Client) handleResponse(resCodec protocol.ClientResponseCodec) {
id := reflect.ValueOf(resCodec.ID()).Convert(uint64Type).Uint()
_rs, ok := c.pendingRequests.Load(id)
if !ok {
2018-04-12 01:45:32 +00:00
logging.Logger().Warnf("%s unexpected ID=[%d] obtained from server", c.logHeader(), id)
2018-04-11 14:16:41 +00:00
return
}
rs := _rs.(*requestState)
2018-04-29 10:26:17 +00:00
if nil != resCodec.Error() {
rs.setError(resCodec.Error())
} else {
err := resCodec.Result(rs.result)
if nil != err {
rs.setError(err)
}
2018-04-11 14:16:41 +00:00
}
rs.done()
}
func (c *Client) handleNotification(notiCodec protocol.ClientNotificationCodec) {
2018-05-02 08:47:34 +00:00
if nil == c.ClientHandler.GetRPCInvoker() {
2018-04-12 01:45:32 +00:00
logging.Logger().Warnf("%s received notification method[%s] but RPC Invoker is not exist", c.logHeader(), notiCodec.Method())
2018-04-11 14:16:41 +00:00
return
}
2018-05-02 08:47:34 +00:00
_, err := c.ClientHandler.GetRPCInvoker().Invoke(notiCodec)
2018-04-11 14:16:41 +00:00
if nil != err {
2018-04-19 11:48:37 +00:00
logging.Logger().Errorf("%s invoking of notification method[%s] has been failed %v", c.logHeader(), notiCodec.Method(), err)
2018-04-11 14:16:41 +00:00
}
}