2017-10-31 09:25:44 +00:00
|
|
|
package client
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2017-11-23 08:50:08 +00:00
|
|
|
"io"
|
2017-11-01 03:10:39 +00:00
|
|
|
"log"
|
2017-11-01 03:40:26 +00:00
|
|
|
"net"
|
2017-11-01 09:01:23 +00:00
|
|
|
"reflect"
|
2017-10-31 09:25:44 +00:00
|
|
|
"runtime"
|
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
"time"
|
|
|
|
|
2017-11-23 08:14:35 +00:00
|
|
|
"git.loafle.net/commons_go/logging"
|
2017-10-31 09:25:44 +00:00
|
|
|
"git.loafle.net/commons_go/rpc/protocol"
|
|
|
|
)
|
|
|
|
|
|
|
|
func New(ch ClientHandler) Client {
|
|
|
|
c := &client{
|
|
|
|
ch: ch,
|
|
|
|
}
|
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
|
|
|
type Client interface {
|
2017-11-01 03:10:39 +00:00
|
|
|
Connect() error
|
|
|
|
Close()
|
|
|
|
|
2017-11-02 06:39:30 +00:00
|
|
|
Notify(method string, args ...interface{}) (err error)
|
|
|
|
Call(result interface{}, method string, args ...interface{}) error
|
|
|
|
CallTimeout(timeout time.Duration, result interface{}, method string, args ...interface{}) (err error)
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type client struct {
|
|
|
|
ch ClientHandler
|
|
|
|
|
2017-11-01 03:40:26 +00:00
|
|
|
conn net.Conn
|
2017-10-31 09:25:44 +00:00
|
|
|
|
|
|
|
pendingRequestsCount uint32
|
2017-11-01 08:43:20 +00:00
|
|
|
pendingRequests map[uint64]*CallState
|
2017-11-01 03:10:39 +00:00
|
|
|
pendingRequestsLock sync.Mutex
|
2017-10-31 09:25:44 +00:00
|
|
|
|
|
|
|
requestQueueChan chan *CallState
|
|
|
|
|
|
|
|
stopChan chan struct{}
|
|
|
|
stopWg sync.WaitGroup
|
|
|
|
}
|
|
|
|
|
2017-11-01 03:10:39 +00:00
|
|
|
func (c *client) Connect() error {
|
|
|
|
var err error
|
2017-10-31 09:25:44 +00:00
|
|
|
c.ch.Validate()
|
|
|
|
|
|
|
|
if c.stopChan != nil {
|
2017-11-01 03:10:39 +00:00
|
|
|
panic("RPC Client: the given client is already started. Call Client.Stop() before calling Client.Start() again!")
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
|
2017-11-01 03:40:26 +00:00
|
|
|
if c.conn, err = c.ch.Connect(); nil != err {
|
2017-11-01 03:10:39 +00:00
|
|
|
return err
|
|
|
|
}
|
2017-10-31 09:25:44 +00:00
|
|
|
c.stopChan = make(chan struct{})
|
|
|
|
c.requestQueueChan = make(chan *CallState, c.ch.GetPendingRequests())
|
2017-11-01 08:43:20 +00:00
|
|
|
c.pendingRequests = make(map[uint64]*CallState)
|
2017-10-31 09:25:44 +00:00
|
|
|
|
2017-11-01 03:10:39 +00:00
|
|
|
go c.handleRPC()
|
2017-10-31 11:53:24 +00:00
|
|
|
|
2017-11-01 03:10:39 +00:00
|
|
|
return nil
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
|
2017-11-01 03:10:39 +00:00
|
|
|
func (c *client) Close() {
|
2017-10-31 09:25:44 +00:00
|
|
|
if c.stopChan == nil {
|
|
|
|
panic("Client: the client must be started before stopping it")
|
|
|
|
}
|
|
|
|
close(c.stopChan)
|
|
|
|
c.stopWg.Wait()
|
|
|
|
c.stopChan = nil
|
|
|
|
}
|
|
|
|
|
2017-11-02 06:39:30 +00:00
|
|
|
func (c *client) Notify(method string, args ...interface{}) (err error) {
|
|
|
|
var cs *CallState
|
|
|
|
if cs, err = c.send(true, false, nil, method, args...); nil != err {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-cs.DoneChan:
|
|
|
|
err = cs.Error
|
2017-11-22 11:55:10 +00:00
|
|
|
ReleaseCallState(cs)
|
2017-11-02 06:39:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
|
2017-11-02 06:39:30 +00:00
|
|
|
func (c *client) Call(result interface{}, method string, args ...interface{}) error {
|
|
|
|
return c.CallTimeout(c.ch.GetRequestTimeout(), result, method, args...)
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
|
2017-11-02 06:39:30 +00:00
|
|
|
func (c *client) CallTimeout(timeout time.Duration, result interface{}, method string, args ...interface{}) (err error) {
|
2017-10-31 09:25:44 +00:00
|
|
|
var cs *CallState
|
2017-11-02 06:39:30 +00:00
|
|
|
if cs, err = c.send(true, true, result, method, args...); nil != err {
|
2017-10-31 09:25:44 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
t := retainTimer(timeout)
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-cs.DoneChan:
|
|
|
|
result, err = cs.Result, cs.Error
|
2017-11-22 11:55:10 +00:00
|
|
|
ReleaseCallState(cs)
|
2017-10-31 09:25:44 +00:00
|
|
|
case <-t.C:
|
|
|
|
cs.Cancel()
|
|
|
|
err = getClientTimeoutError(c, timeout)
|
|
|
|
}
|
|
|
|
|
|
|
|
releaseTimer(t)
|
|
|
|
|
2017-11-01 09:41:40 +00:00
|
|
|
return
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
|
2017-11-02 06:39:30 +00:00
|
|
|
func (c *client) send(usePool bool, hasResponse bool, result interface{}, method string, args ...interface{}) (cs *CallState, err error) {
|
2017-10-31 09:25:44 +00:00
|
|
|
if !hasResponse {
|
|
|
|
usePool = true
|
|
|
|
}
|
|
|
|
|
|
|
|
if usePool {
|
2017-11-22 11:55:10 +00:00
|
|
|
cs = RetainCallState()
|
2017-10-31 09:25:44 +00:00
|
|
|
} else {
|
|
|
|
cs = &CallState{}
|
|
|
|
}
|
|
|
|
|
2017-11-02 06:39:30 +00:00
|
|
|
cs.hasResponse = hasResponse
|
2017-10-31 09:25:44 +00:00
|
|
|
cs.Method = method
|
|
|
|
cs.Args = args
|
2017-11-02 06:39:30 +00:00
|
|
|
cs.DoneChan = make(chan *CallState, 1)
|
2017-10-31 09:25:44 +00:00
|
|
|
|
|
|
|
if hasResponse {
|
|
|
|
cs.ID = c.ch.GetRequestID()
|
|
|
|
cs.Result = result
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case c.requestQueueChan <- cs:
|
|
|
|
return cs, nil
|
|
|
|
default:
|
|
|
|
// Try substituting the oldest async request by the new one
|
|
|
|
// on requests' queue overflow.
|
|
|
|
// This increases the chances for new request to succeed
|
|
|
|
// without timeout.
|
|
|
|
if !hasResponse {
|
|
|
|
// Immediately notify the caller not interested
|
|
|
|
// in the response on requests' queue overflow, since
|
|
|
|
// there are no other ways to notify it later.
|
2017-11-22 11:55:10 +00:00
|
|
|
ReleaseCallState(cs)
|
2017-10-31 09:25:44 +00:00
|
|
|
return nil, getClientOverflowError(c)
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case rcs := <-c.requestQueueChan:
|
|
|
|
if rcs.DoneChan != nil {
|
|
|
|
rcs.Error = getClientOverflowError(c)
|
|
|
|
//close(rcs.DoneChan)
|
2017-11-22 11:55:10 +00:00
|
|
|
rcs.Done()
|
2017-10-31 09:25:44 +00:00
|
|
|
} else {
|
2017-11-22 11:55:10 +00:00
|
|
|
ReleaseCallState(rcs)
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case c.requestQueueChan <- cs:
|
|
|
|
return cs, nil
|
|
|
|
default:
|
|
|
|
// Release m even if usePool = true, since m wasn't exposed
|
|
|
|
// to the caller yet.
|
2017-11-22 11:55:10 +00:00
|
|
|
ReleaseCallState(cs)
|
2017-10-31 09:25:44 +00:00
|
|
|
return nil, getClientOverflowError(c)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-11-01 03:10:39 +00:00
|
|
|
func (c *client) handleRPC() {
|
|
|
|
subStopChan := make(chan struct{})
|
2017-10-31 09:25:44 +00:00
|
|
|
|
|
|
|
writerDone := make(chan error, 1)
|
2017-11-01 03:10:39 +00:00
|
|
|
go c.rpcWriter(subStopChan, writerDone)
|
2017-10-31 09:25:44 +00:00
|
|
|
|
|
|
|
readerDone := make(chan error, 1)
|
2017-11-01 03:10:39 +00:00
|
|
|
go c.rpcReader(readerDone)
|
2017-10-31 09:25:44 +00:00
|
|
|
|
|
|
|
var err error
|
|
|
|
|
|
|
|
select {
|
|
|
|
case err = <-writerDone:
|
2017-11-01 03:10:39 +00:00
|
|
|
close(subStopChan)
|
2017-10-31 09:25:44 +00:00
|
|
|
<-readerDone
|
|
|
|
case err = <-readerDone:
|
2017-11-01 03:10:39 +00:00
|
|
|
close(subStopChan)
|
2017-10-31 09:25:44 +00:00
|
|
|
<-writerDone
|
|
|
|
case <-c.stopChan:
|
2017-11-01 03:10:39 +00:00
|
|
|
close(subStopChan)
|
2017-10-31 09:25:44 +00:00
|
|
|
<-readerDone
|
|
|
|
<-writerDone
|
|
|
|
}
|
|
|
|
|
2017-11-01 03:40:26 +00:00
|
|
|
c.conn.Close()
|
2017-11-01 03:10:39 +00:00
|
|
|
|
2017-10-31 09:25:44 +00:00
|
|
|
if err != nil {
|
|
|
|
//c.LogError("%s", err)
|
2017-11-01 03:10:39 +00:00
|
|
|
log.Printf("handleRPC: %v", err)
|
2017-10-31 09:25:44 +00:00
|
|
|
err = &ClientError{
|
|
|
|
Connection: true,
|
2017-11-22 11:55:10 +00:00
|
|
|
Err: err,
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-11-01 03:10:39 +00:00
|
|
|
for _, cs := range c.pendingRequests {
|
2017-10-31 11:49:11 +00:00
|
|
|
atomic.AddUint32(&c.pendingRequestsCount, ^uint32(0))
|
|
|
|
cs.Error = err
|
|
|
|
if cs.DoneChan != nil {
|
2017-11-22 11:55:10 +00:00
|
|
|
cs.Done()
|
2017-10-31 11:49:11 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
|
2017-11-01 03:10:39 +00:00
|
|
|
func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) {
|
2017-10-31 09:25:44 +00:00
|
|
|
var err error
|
|
|
|
defer func() {
|
|
|
|
writerDone <- err
|
|
|
|
}()
|
|
|
|
|
|
|
|
for {
|
|
|
|
var cs *CallState
|
|
|
|
|
|
|
|
select {
|
|
|
|
case cs = <-c.requestQueueChan:
|
|
|
|
default:
|
|
|
|
// Give the last chance for ready goroutines filling c.requestsChan :)
|
|
|
|
runtime.Gosched()
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-stopChan:
|
|
|
|
return
|
|
|
|
case cs = <-c.requestQueueChan:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if cs.IsCanceled() {
|
|
|
|
if nil != cs.DoneChan {
|
|
|
|
// cs.Error = ErrCanceled
|
|
|
|
// close(m.done)
|
2017-11-22 11:55:10 +00:00
|
|
|
cs.Done()
|
2017-10-31 09:25:44 +00:00
|
|
|
} else {
|
2017-11-22 11:55:10 +00:00
|
|
|
ReleaseCallState(cs)
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2017-11-02 06:39:30 +00:00
|
|
|
if cs.hasResponse {
|
2017-11-01 03:10:39 +00:00
|
|
|
c.pendingRequestsLock.Lock()
|
|
|
|
n := len(c.pendingRequests)
|
|
|
|
c.pendingRequests[cs.ID] = cs
|
|
|
|
c.pendingRequestsLock.Unlock()
|
2017-10-31 09:25:44 +00:00
|
|
|
atomic.AddUint32(&c.pendingRequestsCount, 1)
|
|
|
|
|
|
|
|
if n > 10*c.ch.GetPendingRequests() {
|
|
|
|
err = fmt.Errorf("Client: The server didn't return %d responses yet. Closing server connection in order to prevent client resource leaks", n)
|
2017-11-23 08:14:35 +00:00
|
|
|
logging.Logger().Error(err.Error())
|
|
|
|
continue
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
}
|
2017-11-23 08:43:22 +00:00
|
|
|
var requestID interface{}
|
|
|
|
if 0 < cs.ID {
|
|
|
|
requestID = cs.ID
|
|
|
|
}
|
|
|
|
err = c.ch.GetCodec().Write(c.conn, cs.Method, cs.Args, requestID)
|
2017-11-02 06:39:30 +00:00
|
|
|
if !cs.hasResponse {
|
|
|
|
cs.Error = err
|
2017-11-22 11:55:10 +00:00
|
|
|
cs.Done()
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
2017-11-01 10:29:27 +00:00
|
|
|
if nil != err {
|
2017-10-31 09:25:44 +00:00
|
|
|
err = fmt.Errorf("Client: Cannot send request to wire: [%s]", err)
|
2017-11-23 08:14:35 +00:00
|
|
|
logging.Logger().Error(err.Error())
|
|
|
|
continue
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-11-01 03:10:39 +00:00
|
|
|
func (c *client) rpcReader(readerDone chan<- error) {
|
2017-10-31 09:25:44 +00:00
|
|
|
var err error
|
|
|
|
defer func() {
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
if err == nil {
|
|
|
|
err = fmt.Errorf("Client: Panic when reading data from server: %v", r)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
readerDone <- err
|
|
|
|
}()
|
|
|
|
|
|
|
|
for {
|
2017-11-01 03:40:26 +00:00
|
|
|
crn, err := c.ch.GetCodec().NewResponseOrNotify(c.conn)
|
2017-10-31 09:25:44 +00:00
|
|
|
if nil != err {
|
2017-11-23 08:50:08 +00:00
|
|
|
if err == io.ErrUnexpectedEOF || err == io.EOF {
|
|
|
|
logging.Logger().Info("Client: disconnected from server")
|
|
|
|
return
|
|
|
|
}
|
2017-10-31 09:25:44 +00:00
|
|
|
err = fmt.Errorf("Client: Cannot decode response or notify: [%s]", err)
|
2017-11-23 08:43:22 +00:00
|
|
|
logging.Logger().Error(err.Error())
|
|
|
|
continue
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if crn.IsResponse() {
|
2017-11-01 03:10:39 +00:00
|
|
|
err = c.responseHandle(crn.GetResponse())
|
2017-10-31 09:25:44 +00:00
|
|
|
} else {
|
2017-11-01 03:10:39 +00:00
|
|
|
err = c.notifyHandle(crn.GetNotify())
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
if nil != err {
|
2017-11-23 08:43:22 +00:00
|
|
|
logging.Logger().Error(err.Error())
|
|
|
|
continue
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2017-11-01 03:10:39 +00:00
|
|
|
func (c *client) responseHandle(codecResponse protocol.ClientCodecResponse) error {
|
|
|
|
c.pendingRequestsLock.Lock()
|
2017-11-01 09:01:23 +00:00
|
|
|
id := reflect.ValueOf(codecResponse.ID()).Convert(uint64Type).Uint()
|
|
|
|
|
|
|
|
cs, ok := c.pendingRequests[id]
|
2017-10-31 09:25:44 +00:00
|
|
|
if ok {
|
2017-11-01 09:01:23 +00:00
|
|
|
delete(c.pendingRequests, id)
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
2017-11-01 03:10:39 +00:00
|
|
|
c.pendingRequestsLock.Unlock()
|
2017-10-31 09:25:44 +00:00
|
|
|
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("Client: Unexpected ID=[%v] obtained from server", codecResponse.ID())
|
|
|
|
}
|
|
|
|
|
|
|
|
atomic.AddUint32(&c.pendingRequestsCount, ^uint32(0))
|
|
|
|
|
2017-11-01 10:03:06 +00:00
|
|
|
if err := codecResponse.Result(cs.Result); nil != err {
|
|
|
|
log.Printf("responseHandle:%v", err)
|
|
|
|
}
|
2017-10-31 09:25:44 +00:00
|
|
|
if err := codecResponse.Error(); nil != err {
|
2017-11-01 10:03:06 +00:00
|
|
|
log.Printf("responseHandle:%v", err)
|
2017-10-31 09:25:44 +00:00
|
|
|
// cs.Error = &ClientError{
|
|
|
|
// Server: true,
|
|
|
|
// err: fmt.Errorf("gorpc.Client: [%s]. Server error: [%s]", c.Addr, wr.Error),
|
|
|
|
// }
|
|
|
|
}
|
|
|
|
|
2017-11-22 11:55:10 +00:00
|
|
|
cs.Done()
|
2017-10-31 09:25:44 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-11-01 03:10:39 +00:00
|
|
|
func (c *client) notifyHandle(codecNotify protocol.ClientCodecNotify) error {
|
2017-10-31 09:25:44 +00:00
|
|
|
_, err := c.ch.GetRPCRegistry().Invoke(codecNotify)
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func getClientTimeoutError(c *client, timeout time.Duration) error {
|
|
|
|
err := fmt.Errorf("Client: Cannot obtain response during timeout=%s", timeout)
|
|
|
|
//c.LogError("%s", err)
|
|
|
|
return &ClientError{
|
|
|
|
Timeout: true,
|
2017-11-22 11:55:10 +00:00
|
|
|
Err: err,
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func getClientOverflowError(c *client) error {
|
|
|
|
err := fmt.Errorf("Client: Requests' queue with size=%d is overflown. Try increasing Client.PendingRequests value", cap(c.requestQueueChan))
|
|
|
|
//c.LogError("%s", err)
|
|
|
|
return &ClientError{
|
|
|
|
Overflow: true,
|
2017-11-22 11:55:10 +00:00
|
|
|
Err: err,
|
2017-10-31 09:25:44 +00:00
|
|
|
}
|
|
|
|
}
|