rpc/client/client.go

435 lines
9.2 KiB
Go
Raw Normal View History

2017-10-31 09:25:44 +00:00
package client
import (
"fmt"
2017-11-23 08:50:08 +00:00
"io"
2017-11-01 03:10:39 +00:00
"log"
2017-11-01 09:01:23 +00:00
"reflect"
2017-10-31 09:25:44 +00:00
"runtime"
"sync"
"sync/atomic"
"time"
2017-11-23 08:14:35 +00:00
"git.loafle.net/commons_go/logging"
2017-10-31 09:25:44 +00:00
"git.loafle.net/commons_go/rpc/protocol"
)
2017-11-29 09:55:24 +00:00
func New(ch ClientHandler, rwcHandler ClientReadWriteCloseHandler) Client {
2017-10-31 09:25:44 +00:00
c := &client{
2017-11-29 09:55:24 +00:00
ch: ch,
rwcHandler: rwcHandler,
2017-10-31 09:25:44 +00:00
}
return c
}
type Client interface {
2017-11-01 03:10:39 +00:00
Connect() error
Close()
2017-11-26 10:15:51 +00:00
Send(method string, args ...interface{}) (err error)
2017-11-02 06:39:30 +00:00
Call(result interface{}, method string, args ...interface{}) error
CallTimeout(timeout time.Duration, result interface{}, method string, args ...interface{}) (err error)
2017-10-31 09:25:44 +00:00
}
type client struct {
2017-11-29 09:55:24 +00:00
ctx ClientContext
ch ClientHandler
rwcHandler ClientReadWriteCloseHandler
2017-10-31 09:25:44 +00:00
2018-03-23 09:45:53 +00:00
conn interface{}
2017-10-31 09:25:44 +00:00
pendingRequestsCount uint32
2017-11-26 10:15:51 +00:00
pendingRequests map[uint64]*RequestState
2017-11-01 03:10:39 +00:00
pendingRequestsLock sync.Mutex
2017-10-31 09:25:44 +00:00
2017-11-26 10:15:51 +00:00
requestQueueChan chan *RequestState
2017-10-31 09:25:44 +00:00
stopChan chan struct{}
stopWg sync.WaitGroup
2018-03-23 07:43:05 +00:00
requestMtx sync.Mutex
responseMtx sync.Mutex
2017-10-31 09:25:44 +00:00
}
2017-11-01 03:10:39 +00:00
func (c *client) Connect() error {
var err error
2017-11-29 09:55:24 +00:00
if nil == c.ch {
2018-03-23 07:43:05 +00:00
return fmt.Errorf("RPC Client: Client handler must be specified")
2017-11-29 09:55:24 +00:00
}
2017-10-31 09:25:44 +00:00
c.ch.Validate()
2017-11-29 09:55:24 +00:00
if nil == c.rwcHandler {
2018-03-23 07:43:05 +00:00
return fmt.Errorf("RPC Client: Client RWC handler must be specified")
2017-11-29 09:55:24 +00:00
}
c.rwcHandler.Validate()
2017-10-31 09:25:44 +00:00
if c.stopChan != nil {
2017-12-01 13:10:48 +00:00
return fmt.Errorf("RPC Client: the given client is already started. Call Client.Stop() before calling Client.Start() again")
2017-10-31 09:25:44 +00:00
}
2017-11-29 09:55:24 +00:00
c.ctx = c.ch.ClientContext(nil)
if err := c.ch.Init(c.ctx); nil != err {
2018-03-23 07:43:05 +00:00
return fmt.Errorf("RPC Client: Initialization of client has been failed %v", err)
2017-11-29 09:55:24 +00:00
}
2017-10-31 09:25:44 +00:00
2017-11-30 03:36:03 +00:00
if c.conn, err = c.rwcHandler.Connect(c.ctx); nil != err {
2017-11-01 03:10:39 +00:00
return err
}
2018-03-23 07:43:05 +00:00
2017-10-31 09:25:44 +00:00
c.stopChan = make(chan struct{})
2017-11-26 10:15:51 +00:00
c.requestQueueChan = make(chan *RequestState, c.ch.GetPendingRequests())
c.pendingRequests = make(map[uint64]*RequestState)
2017-10-31 09:25:44 +00:00
2017-11-01 03:10:39 +00:00
go c.handleRPC()
2017-10-31 11:53:24 +00:00
2017-11-01 03:10:39 +00:00
return nil
2017-10-31 09:25:44 +00:00
}
2017-11-01 03:10:39 +00:00
func (c *client) Close() {
2017-10-31 09:25:44 +00:00
if c.stopChan == nil {
2018-03-21 11:26:36 +00:00
logging.Logger().Warnf("RPC Client: the client must be started before stopping it")
2017-12-01 13:10:48 +00:00
return
2017-10-31 09:25:44 +00:00
}
2017-11-29 09:55:24 +00:00
c.ch.Destroy(c.ctx)
2017-10-31 09:25:44 +00:00
close(c.stopChan)
c.stopWg.Wait()
c.stopChan = nil
2017-11-29 09:55:24 +00:00
2018-03-21 11:26:36 +00:00
logging.Logger().Infof("RPC Client: stopped")
2017-10-31 09:25:44 +00:00
}
2017-11-26 10:15:51 +00:00
func (c *client) Send(method string, args ...interface{}) (err error) {
2017-11-29 09:55:24 +00:00
var rs *RequestState
if rs, err = c.send(true, false, nil, method, args...); nil != err {
2017-11-02 06:39:30 +00:00
return
}
select {
2017-11-29 09:55:24 +00:00
case <-rs.DoneChan:
err = rs.Error
releaseCallState(rs)
2017-11-02 06:39:30 +00:00
}
return
2017-10-31 09:25:44 +00:00
}
2017-11-02 06:39:30 +00:00
func (c *client) Call(result interface{}, method string, args ...interface{}) error {
return c.CallTimeout(c.ch.GetRequestTimeout(), result, method, args...)
2017-10-31 09:25:44 +00:00
}
2017-11-02 06:39:30 +00:00
func (c *client) CallTimeout(timeout time.Duration, result interface{}, method string, args ...interface{}) (err error) {
2017-11-29 09:55:24 +00:00
var rs *RequestState
if rs, err = c.send(true, true, result, method, args...); nil != err {
2017-10-31 09:25:44 +00:00
return
}
t := retainTimer(timeout)
select {
2017-11-29 09:55:24 +00:00
case <-rs.DoneChan:
result, err = rs.Result, rs.Error
releaseCallState(rs)
2017-10-31 09:25:44 +00:00
case <-t.C:
2017-11-29 09:55:24 +00:00
rs.Cancel()
2017-10-31 09:25:44 +00:00
err = getClientTimeoutError(c, timeout)
}
releaseTimer(t)
2017-11-01 09:41:40 +00:00
return
2017-10-31 09:25:44 +00:00
}
2017-11-29 09:55:24 +00:00
func (c *client) send(usePool bool, hasResponse bool, result interface{}, method string, args ...interface{}) (rs *RequestState, err error) {
2017-10-31 09:25:44 +00:00
if !hasResponse {
usePool = true
}
if usePool {
2017-11-29 09:55:24 +00:00
rs = retainRequestState()
2017-10-31 09:25:44 +00:00
} else {
2017-11-29 09:55:24 +00:00
rs = &RequestState{}
2017-10-31 09:25:44 +00:00
}
2017-11-29 09:55:24 +00:00
rs.hasResponse = hasResponse
rs.Method = method
rs.Args = args
rs.DoneChan = make(chan *RequestState, 1)
2017-10-31 09:25:44 +00:00
if hasResponse {
2017-11-29 09:55:24 +00:00
rs.ID = c.ch.GetRequestID()
rs.Result = result
2017-10-31 09:25:44 +00:00
}
select {
2017-11-29 09:55:24 +00:00
case c.requestQueueChan <- rs:
return rs, nil
2017-10-31 09:25:44 +00:00
default:
// Try substituting the oldest async request by the new one
// on requests' queue overflow.
// This increases the chances for new request to succeed
// without timeout.
if !hasResponse {
// Immediately notify the caller not interested
// in the response on requests' queue overflow, since
// there are no other ways to notify it later.
2017-11-29 09:55:24 +00:00
releaseCallState(rs)
2017-10-31 09:25:44 +00:00
return nil, getClientOverflowError(c)
}
select {
case rcs := <-c.requestQueueChan:
if rcs.DoneChan != nil {
rcs.Error = getClientOverflowError(c)
//close(rcs.DoneChan)
2017-11-22 11:55:10 +00:00
rcs.Done()
2017-10-31 09:25:44 +00:00
} else {
2017-11-26 10:15:51 +00:00
releaseCallState(rcs)
2017-10-31 09:25:44 +00:00
}
default:
}
select {
2017-11-29 09:55:24 +00:00
case c.requestQueueChan <- rs:
return rs, nil
2017-10-31 09:25:44 +00:00
default:
// Release m even if usePool = true, since m wasn't exposed
// to the caller yet.
2017-11-29 09:55:24 +00:00
releaseCallState(rs)
2017-10-31 09:25:44 +00:00
return nil, getClientOverflowError(c)
}
}
}
2017-11-01 03:10:39 +00:00
func (c *client) handleRPC() {
subStopChan := make(chan struct{})
2017-10-31 09:25:44 +00:00
writerDone := make(chan error, 1)
2017-11-01 03:10:39 +00:00
go c.rpcWriter(subStopChan, writerDone)
2017-10-31 09:25:44 +00:00
readerDone := make(chan error, 1)
2017-11-01 03:10:39 +00:00
go c.rpcReader(readerDone)
2017-10-31 09:25:44 +00:00
var err error
select {
case err = <-writerDone:
2017-11-01 03:10:39 +00:00
close(subStopChan)
2017-10-31 09:25:44 +00:00
<-readerDone
case err = <-readerDone:
2017-11-01 03:10:39 +00:00
close(subStopChan)
2017-10-31 09:25:44 +00:00
<-writerDone
case <-c.stopChan:
2017-11-01 03:10:39 +00:00
close(subStopChan)
2017-10-31 09:25:44 +00:00
<-readerDone
<-writerDone
}
2017-11-30 04:59:46 +00:00
if nil != c.conn {
c.rwcHandler.Disconnect(c.ctx, c.conn)
}
2017-11-01 03:10:39 +00:00
2017-10-31 09:25:44 +00:00
if err != nil {
//c.LogError("%s", err)
2017-11-01 03:10:39 +00:00
log.Printf("handleRPC: %v", err)
2017-10-31 09:25:44 +00:00
err = &ClientError{
Connection: true,
2017-11-22 11:55:10 +00:00
Err: err,
2017-10-31 09:25:44 +00:00
}
}
2017-11-29 09:55:24 +00:00
for _, rs := range c.pendingRequests {
2017-10-31 11:49:11 +00:00
atomic.AddUint32(&c.pendingRequestsCount, ^uint32(0))
2017-11-29 09:55:24 +00:00
rs.Error = err
if rs.DoneChan != nil {
rs.Done()
2017-10-31 11:49:11 +00:00
}
}
2017-10-31 09:25:44 +00:00
}
2017-11-01 03:10:39 +00:00
func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) {
2017-10-31 09:25:44 +00:00
var err error
defer func() {
writerDone <- err
}()
for {
2017-11-29 09:55:24 +00:00
var rs *RequestState
2017-10-31 09:25:44 +00:00
select {
2017-11-29 09:55:24 +00:00
case rs = <-c.requestQueueChan:
2017-10-31 09:25:44 +00:00
default:
// Give the last chance for ready goroutines filling c.requestsChan :)
runtime.Gosched()
select {
case <-stopChan:
return
2017-11-29 09:55:24 +00:00
case rs = <-c.requestQueueChan:
2017-10-31 09:25:44 +00:00
}
}
2017-11-29 09:55:24 +00:00
if rs.IsCanceled() {
if nil != rs.DoneChan {
// rs.Error = ErrCanceled
2017-10-31 09:25:44 +00:00
// close(m.done)
2017-11-29 09:55:24 +00:00
rs.Done()
2017-10-31 09:25:44 +00:00
} else {
2017-11-29 09:55:24 +00:00
releaseCallState(rs)
2017-10-31 09:25:44 +00:00
}
continue
}
2017-11-29 09:55:24 +00:00
if rs.hasResponse {
2017-11-01 03:10:39 +00:00
c.pendingRequestsLock.Lock()
n := len(c.pendingRequests)
2017-11-29 09:55:24 +00:00
c.pendingRequests[rs.ID] = rs
2017-11-01 03:10:39 +00:00
c.pendingRequestsLock.Unlock()
2017-10-31 09:25:44 +00:00
atomic.AddUint32(&c.pendingRequestsCount, 1)
if n > 10*c.ch.GetPendingRequests() {
err = fmt.Errorf("Client: The server didn't return %d responses yet. Closing server connection in order to prevent client resource leaks", n)
2017-11-23 08:14:35 +00:00
logging.Logger().Error(err.Error())
continue
2017-10-31 09:25:44 +00:00
}
}
2017-11-26 10:15:51 +00:00
2017-11-23 08:43:22 +00:00
var requestID interface{}
2017-11-29 09:55:24 +00:00
if 0 < rs.ID {
requestID = rs.ID
2017-11-23 08:43:22 +00:00
}
2017-11-29 09:55:24 +00:00
2017-11-30 04:59:46 +00:00
if nil == c.conn {
err = io.EOF
return
}
2018-03-23 07:43:05 +00:00
c.requestMtx.Lock()
2017-11-29 09:55:24 +00:00
err = c.rwcHandler.WriteRequest(c.ctx, c.ch.GetCodec(), c.conn, rs.Method, rs.Args, requestID)
2018-03-23 07:43:05 +00:00
c.requestMtx.Unlock()
2017-11-29 09:55:24 +00:00
if !rs.hasResponse {
rs.Error = err
rs.Done()
2017-10-31 09:25:44 +00:00
}
2017-11-01 10:29:27 +00:00
if nil != err {
2017-11-23 08:53:03 +00:00
if err == io.ErrUnexpectedEOF || err == io.EOF {
2018-03-21 11:26:36 +00:00
logging.Logger().Infof("Client: disconnected from server")
2017-11-23 08:53:03 +00:00
return
}
2017-10-31 09:25:44 +00:00
err = fmt.Errorf("Client: Cannot send request to wire: [%s]", err)
2018-03-22 16:37:28 +00:00
logging.Logger().Error(err)
2017-11-23 08:14:35 +00:00
continue
2017-10-31 09:25:44 +00:00
}
}
}
2017-11-01 03:10:39 +00:00
func (c *client) rpcReader(readerDone chan<- error) {
2017-10-31 09:25:44 +00:00
var err error
defer func() {
readerDone <- err
}()
for {
2017-11-30 04:59:46 +00:00
if nil == c.conn {
err = io.EOF
return
}
2018-03-23 07:43:05 +00:00
c.responseMtx.Lock()
2018-03-23 09:45:53 +00:00
resCodec, err := c.rwcHandler.ReadResponse(c.ctx, c.ch.GetCodec(), c.conn)
2018-03-23 07:43:05 +00:00
c.responseMtx.Unlock()
2017-10-31 09:25:44 +00:00
if nil != err {
2017-11-23 08:50:08 +00:00
if err == io.ErrUnexpectedEOF || err == io.EOF {
2018-03-21 11:26:36 +00:00
logging.Logger().Infof("Client: disconnected from server")
2017-11-23 08:50:08 +00:00
return
}
2018-03-22 16:37:28 +00:00
logging.Logger().Errorf("Client: Cannot decode response or notify: [%s]", err)
2017-11-23 08:43:22 +00:00
continue
2017-10-31 09:25:44 +00:00
}
2017-11-28 16:19:03 +00:00
if nil != resCodec.ID() {
err = c.handleResponse(resCodec)
} else {
err = c.handleNotification(resCodec)
2017-10-31 09:25:44 +00:00
}
2017-11-26 10:15:51 +00:00
2017-10-31 09:25:44 +00:00
if nil != err {
2017-11-23 08:43:22 +00:00
logging.Logger().Error(err.Error())
continue
2017-10-31 09:25:44 +00:00
}
}
}
2017-11-28 16:19:03 +00:00
func (c *client) handleResponse(resCodec protocol.ClientResponseCodec) error {
2017-11-01 03:10:39 +00:00
c.pendingRequestsLock.Lock()
2017-11-26 10:15:51 +00:00
id := reflect.ValueOf(resCodec.ID()).Convert(uint64Type).Uint()
2017-11-01 09:01:23 +00:00
2017-11-29 09:55:24 +00:00
rs, ok := c.pendingRequests[id]
2017-10-31 09:25:44 +00:00
if ok {
2017-11-01 09:01:23 +00:00
delete(c.pendingRequests, id)
2017-10-31 09:25:44 +00:00
}
2017-11-01 03:10:39 +00:00
c.pendingRequestsLock.Unlock()
2017-10-31 09:25:44 +00:00
if !ok {
2017-11-26 10:15:51 +00:00
return fmt.Errorf("Client: Unexpected ID=[%v] obtained from server", resCodec.ID())
2017-10-31 09:25:44 +00:00
}
atomic.AddUint32(&c.pendingRequestsCount, ^uint32(0))
2017-11-29 09:55:24 +00:00
if err := resCodec.Result(rs.Result); nil != err {
2018-03-22 16:00:11 +00:00
logging.Logger().Errorf("responseHandle:%v", err)
2017-11-01 10:03:06 +00:00
}
2017-11-26 10:15:51 +00:00
if err := resCodec.Error(); nil != err {
2018-03-22 16:00:11 +00:00
logging.Logger().Errorf("responseHandle:%v", err)
2017-11-29 09:55:24 +00:00
// rs.Error = &ClientError{
2017-10-31 09:25:44 +00:00
// Server: true,
// err: fmt.Errorf("gorpc.Client: [%s]. Server error: [%s]", c.Addr, wr.Error),
// }
}
2017-11-29 09:55:24 +00:00
rs.Done()
2017-10-31 09:25:44 +00:00
return nil
}
2017-11-28 16:19:03 +00:00
func (c *client) handleNotification(resCodec protocol.ClientResponseCodec) error {
notiCodec, err := resCodec.Notification()
2017-11-26 10:15:51 +00:00
if nil != err {
return err
}
2017-12-01 07:48:40 +00:00
if nil == c.ch.GetRPCInvoker() {
2017-11-30 05:55:48 +00:00
params, err := notiCodec.Params()
if nil != err {
return err
}
return fmt.Errorf("Client: Get Notification[method: %s, params: %v]. But RPC registry is not specified", notiCodec.Method(), params)
}
2017-12-01 07:48:40 +00:00
_, err = c.ch.GetRPCInvoker().Invoke(notiCodec)
2017-10-31 09:25:44 +00:00
return err
}
func getClientTimeoutError(c *client, timeout time.Duration) error {
err := fmt.Errorf("Client: Cannot obtain response during timeout=%s", timeout)
//c.LogError("%s", err)
return &ClientError{
Timeout: true,
2017-11-22 11:55:10 +00:00
Err: err,
2017-10-31 09:25:44 +00:00
}
}
func getClientOverflowError(c *client) error {
err := fmt.Errorf("Client: Requests' queue with size=%d is overflown. Try increasing Client.PendingRequests value", cap(c.requestQueueChan))
//c.LogError("%s", err)
return &ClientError{
Overflow: true,
2017-11-22 11:55:10 +00:00
Err: err,
2017-10-31 09:25:44 +00:00
}
}