This commit is contained in:
crusader 2018-05-02 17:47:34 +09:00
parent 74d68de28f
commit 6729b0465a
4 changed files with 175 additions and 31 deletions

19
client/client-context.go Normal file
View File

@ -0,0 +1,19 @@
package client
import (
cuc "git.loafle.net/commons/util-go/context"
)
type ClientCtx interface {
cuc.Context
}
func NewClientCtx(parent cuc.Context) ClientCtx {
return &clientCtx{
Context: cuc.NewContext(parent),
}
}
type clientCtx struct {
cuc.Context
}

118
client/client-handler.go Normal file
View File

@ -0,0 +1,118 @@
package client
import (
"fmt"
"sync/atomic"
"time"
rpc "git.loafle.net/commons/rpc-go"
"git.loafle.net/commons/rpc-go/protocol"
"git.loafle.net/commons/rpc-go/registry"
csc "git.loafle.net/commons/server-go/client"
)
type ClientHandler interface {
GetName() string
GetConnector() csc.Connector
GetRPCCodec() protocol.ClientCodec
GetRPCInvoker() registry.RPCInvoker
GetPendingRequestCount() int
GetRequestTimeout() time.Duration
ClientCtx() ClientCtx
Init(clientCtx ClientCtx) error
OnStart(clientCtx ClientCtx) error
OnStop(clientCtx ClientCtx)
Destroy(clientCtx ClientCtx)
Validate() error
}
type ClientHandlers struct {
// Client name for sending in response headers.
//
// Default client name is used if left blank.
Name string `json:"name,omitempty"`
Connector csc.Connector `json:"-"`
RPCCodec protocol.ClientCodec `json:"-"`
RPCInvoker registry.RPCInvoker `json:"-"`
PendingRequestCount int `json:"pendingRequests,omitempty"`
RequestTimeout time.Duration `json:"requestTimeout,omitempty"`
validated atomic.Value
}
func (ch *ClientHandlers) ClientCtx() ClientCtx {
return NewClientCtx(nil)
}
func (ch *ClientHandlers) Init(clientCtx ClientCtx) error {
return nil
}
func (ch *ClientHandlers) OnStart(clientCtx ClientCtx) error {
return nil
}
func (ch *ClientHandlers) OnStop(clientCtx ClientCtx) {
}
func (ch *ClientHandlers) Destroy(clientCtx ClientCtx) {
}
func (ch *ClientHandlers) GetName() string {
return ch.Name
}
func (ch *ClientHandlers) GetConnector() csc.Connector {
return ch.Connector
}
func (ch *ClientHandlers) GetRPCCodec() protocol.ClientCodec {
return ch.RPCCodec
}
func (ch *ClientHandlers) GetRPCInvoker() registry.RPCInvoker {
return ch.RPCInvoker
}
func (ch *ClientHandlers) GetPendingRequestCount() int {
return ch.PendingRequestCount
}
func (ch *ClientHandlers) GetRequestTimeout() time.Duration {
return ch.RequestTimeout
}
func (ch *ClientHandlers) Validate() error {
if nil != ch.validated.Load() {
return nil
}
ch.validated.Store(true)
if "" == ch.Name {
ch.Name = "Client"
}
if 0 >= ch.PendingRequestCount {
ch.PendingRequestCount = rpc.DefaultPendingRequestCount
}
if ch.RequestTimeout <= 0 {
ch.RequestTimeout = rpc.DefaultRequestTimeout
} else {
ch.RequestTimeout = ch.RequestTimeout * time.Second
}
if nil == ch.RPCCodec {
return fmt.Errorf("RPCCodec is not valid")
}
if nil == ch.Connector {
return fmt.Errorf("Connector is not valid")
}
if err := ch.Connector.Validate(); nil != err {
return err
}
return nil
}

View File

@ -9,20 +9,14 @@ import (
logging "git.loafle.net/commons/logging-go" logging "git.loafle.net/commons/logging-go"
"git.loafle.net/commons/rpc-go/protocol" "git.loafle.net/commons/rpc-go/protocol"
"git.loafle.net/commons/rpc-go/registry"
csc "git.loafle.net/commons/server-go/client"
) )
var uint64Type = reflect.TypeOf(uint64(0)) var uint64Type = reflect.TypeOf(uint64(0))
type Client struct { type Client struct {
Connector csc.Connector ClientHandler ClientHandler
Codec protocol.ClientCodec
RPCInvoker registry.RPCInvoker
Name string
PendingRequests int
RequestTimeout time.Duration
ctx ClientCtx
stopChan chan struct{} stopChan chan struct{}
stopWg sync.WaitGroup stopWg sync.WaitGroup
@ -37,32 +31,28 @@ func (c *Client) Start() error {
return fmt.Errorf("%s already running. Stop it before starting it again", c.logHeader()) return fmt.Errorf("%s already running. Stop it before starting it again", c.logHeader())
} }
if nil == c.Connector { if nil == c.ClientHandler {
return fmt.Errorf("%s Connector is not valid", c.logHeader()) return fmt.Errorf("%s ClientHandler must be specified", c.logHeader())
}
if err := c.ClientHandler.Validate(); nil != err {
return fmt.Errorf("%s validate error %v", c.logHeader(), err)
} }
if err := c.Connector.Validate(); nil != err { c.ctx = c.ClientHandler.ClientCtx()
return err if nil == c.ctx {
return fmt.Errorf("%s ServerCtx is nil", c.logHeader())
} }
if nil == c.Codec { if err := c.ClientHandler.Init(c.ctx); nil != err {
return fmt.Errorf("%s Codec is not valid", c.logHeader()) return fmt.Errorf("%s Init error %v", c.logHeader(), err)
} }
if 0 >= c.PendingRequests { readChan, writeChan, err := c.ClientHandler.GetConnector().Connect()
c.PendingRequests = 1024
}
if 0 >= c.RequestTimeout {
c.RequestTimeout = 20 * time.Second
}
readChan, writeChan, err := c.Connector.Connect()
if nil != err { if nil != err {
return err return err
} }
c.requestQueueChan = make(chan *requestState, c.PendingRequests) c.requestQueueChan = make(chan *requestState, c.ClientHandler.GetPendingRequestCount())
c.stopChan = make(chan struct{}) c.stopChan = make(chan struct{})
c.stopWg.Add(1) c.stopWg.Add(1)
@ -77,13 +67,16 @@ func (c *Client) Stop(ctx context.Context) error {
} }
close(c.stopChan) close(c.stopChan)
c.stopWg.Wait() c.stopWg.Wait()
c.ClientHandler.Destroy(c.ctx)
c.stopChan = nil c.stopChan = nil
return nil return nil
} }
func (c *Client) logHeader() string { func (c *Client) logHeader() string {
return fmt.Sprintf("RPC Client[%s]:", c.Name) return fmt.Sprintf("RPC Client[%s]:", c.ClientHandler.GetName())
} }
func (c *Client) Send(method string, params ...interface{}) error { func (c *Client) Send(method string, params ...interface{}) error {
@ -93,7 +86,7 @@ func (c *Client) Send(method string, params ...interface{}) error {
} }
func (c *Client) Call(result interface{}, method string, params ...interface{}) error { func (c *Client) Call(result interface{}, method string, params ...interface{}) error {
return c.CallTimeout(c.RequestTimeout, result, method, params...) return c.CallTimeout(c.ClientHandler.GetRequestTimeout(), result, method, params...)
} }
func (c *Client) CallTimeout(timeout time.Duration, result interface{}, method string, params ...interface{}) error { func (c *Client) CallTimeout(timeout time.Duration, result interface{}, method string, params ...interface{}) error {
@ -168,14 +161,20 @@ func (c *Client) internalSend(hasResponse bool, result interface{}, method strin
func (c *Client) handleClient(readChan <-chan []byte, writeChan chan<- []byte) { func (c *Client) handleClient(readChan <-chan []byte, writeChan chan<- []byte) {
defer func() { defer func() {
if err := c.Connector.Disconnect(); nil != err { if err := c.ClientHandler.GetConnector().Disconnect(); nil != err {
logging.Logger().Warn(err) logging.Logger().Warn(err)
} }
c.ClientHandler.OnStop(c.ctx)
logging.Logger().Infof("%s Stopped", c.logHeader()) logging.Logger().Infof("%s Stopped", c.logHeader())
c.stopWg.Done() c.stopWg.Done()
}() }()
if err := c.ClientHandler.OnStart(c.ctx); nil != err {
logging.Logger().Error(err)
return
}
stopChan := make(chan struct{}) stopChan := make(chan struct{})
sendDoneChan := make(chan error) sendDoneChan := make(chan error)
receiveDoneChan := make(chan error) receiveDoneChan := make(chan error)
@ -232,7 +231,7 @@ LOOP:
if 0 < rs.id { if 0 < rs.id {
id = rs.id id = rs.id
} }
message, err = c.Codec.NewRequest(rs.method, rs.params, id) message, err = c.ClientHandler.GetRPCCodec().NewRequest(rs.method, rs.params, id)
if nil != err { if nil != err {
rs.setError(err) rs.setError(err)
rs.done() rs.done()
@ -274,7 +273,7 @@ LOOP:
if !ok { if !ok {
return return
} }
resCodec, err := c.Codec.NewResponse(message) resCodec, err := c.ClientHandler.GetRPCCodec().NewResponse(message)
if nil != err { if nil != err {
logging.Logger().Debug(err) logging.Logger().Debug(err)
continue LOOP continue LOOP
@ -320,12 +319,12 @@ func (c *Client) handleResponse(resCodec protocol.ClientResponseCodec) {
} }
func (c *Client) handleNotification(notiCodec protocol.ClientNotificationCodec) { func (c *Client) handleNotification(notiCodec protocol.ClientNotificationCodec) {
if nil == c.RPCInvoker { if nil == c.ClientHandler.GetRPCInvoker() {
logging.Logger().Warnf("%s received notification method[%s] but RPC Invoker is not exist", c.logHeader(), notiCodec.Method()) logging.Logger().Warnf("%s received notification method[%s] but RPC Invoker is not exist", c.logHeader(), notiCodec.Method())
return return
} }
_, err := c.RPCInvoker.Invoke(notiCodec) _, err := c.ClientHandler.GetRPCInvoker().Invoke(notiCodec)
if nil != err { if nil != err {
logging.Logger().Errorf("%s invoking of notification method[%s] has been failed %v", c.logHeader(), notiCodec.Method(), err) logging.Logger().Errorf("%s invoking of notification method[%s] has been failed %v", c.logHeader(), notiCodec.Method(), err)
} }

8
const.go Normal file
View File

@ -0,0 +1,8 @@
package rpc
import "time"
const (
DefaultPendingRequestCount = 1024
DefaultRequestTimeout = 20 * time.Second
)