This commit is contained in:
crusader 2017-11-22 20:55:10 +09:00
parent c9a94de195
commit 5dafb388b1
7 changed files with 234 additions and 20 deletions

View File

@ -21,7 +21,7 @@ type CallState struct {
canceled uint32
}
func (cs *CallState) done() {
func (cs *CallState) Done() {
select {
case cs.DoneChan <- cs:
// ok
@ -50,7 +50,7 @@ func (cs *CallState) IsCanceled() bool {
return atomic.LoadUint32(&cs.canceled) != 0
}
func retainCallState() *CallState {
func RetainCallState() *CallState {
v := callStatePool.Get()
if v == nil {
return &CallState{}
@ -58,7 +58,7 @@ func retainCallState() *CallState {
return v.(*CallState)
}
func releaseCallState(cs *CallState) {
func ReleaseCallState(cs *CallState) {
cs.Method = ""
cs.Args = nil
cs.Result = nil

View File

@ -82,7 +82,7 @@ func (c *client) Notify(method string, args ...interface{}) (err error) {
select {
case <-cs.DoneChan:
err = cs.Error
releaseCallState(cs)
ReleaseCallState(cs)
}
return
@ -103,7 +103,7 @@ func (c *client) CallTimeout(timeout time.Duration, result interface{}, method s
select {
case <-cs.DoneChan:
result, err = cs.Result, cs.Error
releaseCallState(cs)
ReleaseCallState(cs)
case <-t.C:
cs.Cancel()
err = getClientTimeoutError(c, timeout)
@ -120,7 +120,7 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method
}
if usePool {
cs = retainCallState()
cs = RetainCallState()
} else {
cs = &CallState{}
}
@ -147,7 +147,7 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method
// Immediately notify the caller not interested
// in the response on requests' queue overflow, since
// there are no other ways to notify it later.
releaseCallState(cs)
ReleaseCallState(cs)
return nil, getClientOverflowError(c)
}
@ -156,9 +156,9 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method
if rcs.DoneChan != nil {
rcs.Error = getClientOverflowError(c)
//close(rcs.DoneChan)
rcs.done()
rcs.Done()
} else {
releaseCallState(rcs)
ReleaseCallState(rcs)
}
default:
}
@ -169,7 +169,7 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method
default:
// Release m even if usePool = true, since m wasn't exposed
// to the caller yet.
releaseCallState(cs)
ReleaseCallState(cs)
return nil, getClientOverflowError(c)
}
}
@ -206,7 +206,7 @@ func (c *client) handleRPC() {
log.Printf("handleRPC: %v", err)
err = &ClientError{
Connection: true,
err: err,
Err: err,
}
}
@ -214,7 +214,7 @@ func (c *client) handleRPC() {
atomic.AddUint32(&c.pendingRequestsCount, ^uint32(0))
cs.Error = err
if cs.DoneChan != nil {
cs.done()
cs.Done()
}
}
@ -246,9 +246,9 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) {
if nil != cs.DoneChan {
// cs.Error = ErrCanceled
// close(m.done)
cs.done()
cs.Done()
} else {
releaseCallState(cs)
ReleaseCallState(cs)
}
continue
}
@ -269,7 +269,7 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) {
err = c.ch.GetCodec().Write(c.conn, cs.Method, cs.Args, cs.ID)
if !cs.hasResponse {
cs.Error = err
cs.done()
cs.Done()
}
if nil != err {
err = fmt.Errorf("Client: Cannot send request to wire: [%s]", err)
@ -335,7 +335,7 @@ func (c *client) responseHandle(codecResponse protocol.ClientCodecResponse) erro
// }
}
cs.done()
cs.Done()
return nil
}
@ -351,7 +351,7 @@ func getClientTimeoutError(c *client, timeout time.Duration) error {
//c.LogError("%s", err)
return &ClientError{
Timeout: true,
err: err,
Err: err,
}
}
@ -360,6 +360,6 @@ func getClientOverflowError(c *client) error {
//c.LogError("%s", err)
return &ClientError{
Overflow: true,
err: err,
Err: err,
}
}

View File

@ -18,9 +18,9 @@ type ClientError struct {
// May be set if AsyncResult.Cancel is called.
Canceled bool
err error
Err error
}
func (e *ClientError) Error() string {
return e.err.Error()
return e.Err.Error()
}

7
notify/constants.go Normal file
View File

@ -0,0 +1,7 @@
package notify
const (
// DefaultPendingNotifies is the default number of pending messages
// handled by Client and Server.
DefaultPendingNotifies = 1024
)

163
notify/notify.go Normal file
View File

@ -0,0 +1,163 @@
package notify
import (
"fmt"
"log"
"net"
"runtime"
"sync"
"git.loafle.net/commons_go/rpc/client"
)
func New(nh NotifyHandler) Notifier {
n := &notify{
nh: nh,
}
return n
}
type Notifier interface {
Start(conn net.Conn)
Close()
Notify(method string, args ...interface{}) (err error)
}
type notify struct {
nh NotifyHandler
conn net.Conn
notifyQueueChan chan *client.CallState
stopChan chan struct{}
stopWg sync.WaitGroup
}
func (n *notify) Start(conn net.Conn) {
n.nh.Validate()
if n.stopChan != nil {
panic("RPC Notify: the given notify is already started. Call Notifier.Stop() before calling Notifier.Start(conn) again!")
}
n.conn = conn
n.stopChan = make(chan struct{})
n.notifyQueueChan = make(chan *client.CallState, n.nh.GetPendingNotifies())
go n.handleRPC()
}
func (n *notify) Close() {
if n.stopChan == nil {
panic("RPC Notify: the notify must be started before stopping it")
}
close(n.stopChan)
n.stopWg.Wait()
n.stopChan = nil
}
func (n *notify) Notify(method string, args ...interface{}) (err error) {
var cs *client.CallState
if cs, err = n.send(method, args...); nil != err {
return
}
select {
case <-cs.DoneChan:
err = cs.Error
client.ReleaseCallState(cs)
}
return
}
func (n *notify) send(method string, args ...interface{}) (cs *client.CallState, err error) {
cs = client.RetainCallState()
cs.Method = method
cs.Args = args
cs.DoneChan = make(chan *client.CallState, 1)
select {
case n.notifyQueueChan <- cs:
return cs, nil
default:
client.ReleaseCallState(cs)
return nil, getClientOverflowError(n)
}
}
func (n *notify) handleRPC() {
subStopChan := make(chan struct{})
writerDone := make(chan error, 1)
go n.rpcWriter(subStopChan, writerDone)
var err error
select {
case err = <-writerDone:
close(subStopChan)
case <-n.stopChan:
close(subStopChan)
<-writerDone
}
if err != nil {
//n.LogError("%s", err)
log.Printf("handleRPC: %v", err)
err = &client.ClientError{
Connection: true,
Err: err,
}
}
}
func (n *notify) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) {
var err error
defer func() {
writerDone <- err
}()
for {
var cs *client.CallState
select {
case cs = <-n.notifyQueueChan:
default:
// Give the last chance for ready goroutines filling n.requestsChan :)
runtime.Gosched()
select {
case <-stopChan:
return
case cs = <-n.notifyQueueChan:
}
}
if cs.IsCanceled() {
client.ReleaseCallState(cs)
continue
}
err = n.nh.GetCodec().Write(n.conn, cs.Method, cs.Args, cs.ID)
cs.Error = err
cs.Done()
if nil != err {
err = fmt.Errorf("RPC Notify: Cannot send notify to wire: [%s]", err)
return
}
}
}
func getClientOverflowError(n *notify) error {
err := fmt.Errorf("RPC Notify: Notifies' queue with size=%d is overflown. Try increasing NotifyHandler.PendingNotifies value", cap(n.notifyQueueChan))
//c.LogError("%s", err)
return &client.ClientError{
Overflow: true,
Err: err,
}
}

12
notify/notify_handler.go Normal file
View File

@ -0,0 +1,12 @@
package notify
import (
"git.loafle.net/commons_go/rpc/protocol"
)
type NotifyHandler interface {
GetCodec() protocol.ClientCodec
GetPendingNotifies() int
Validate()
}

32
notify/notify_handlers.go Normal file
View File

@ -0,0 +1,32 @@
package notify
import (
"git.loafle.net/commons_go/rpc/protocol"
)
type NotifyHandlers struct {
Codec protocol.ClientCodec
// The maximum number of pending requests in the queue.
//
// The number of pending requsts should exceed the expected number
// of concurrent goroutines calling client's methods.
// Otherwise a lot of ClientError.Overflow errors may appear.
//
// Default is DefaultPendingNotifies.
PendingNotifies int
}
func (nh *NotifyHandlers) GetCodec() protocol.ClientCodec {
return nh.Codec
}
func (nh *NotifyHandlers) GetPendingNotifies() int {
return nh.PendingNotifies
}
func (nh *NotifyHandlers) Validate() {
if nh.PendingNotifies <= 0 {
nh.PendingNotifies = DefaultPendingNotifies
}
}