diff --git a/client/call.go b/client/call.go index a63b848..f20247b 100644 --- a/client/call.go +++ b/client/call.go @@ -21,7 +21,7 @@ type CallState struct { canceled uint32 } -func (cs *CallState) done() { +func (cs *CallState) Done() { select { case cs.DoneChan <- cs: // ok @@ -50,7 +50,7 @@ func (cs *CallState) IsCanceled() bool { return atomic.LoadUint32(&cs.canceled) != 0 } -func retainCallState() *CallState { +func RetainCallState() *CallState { v := callStatePool.Get() if v == nil { return &CallState{} @@ -58,7 +58,7 @@ func retainCallState() *CallState { return v.(*CallState) } -func releaseCallState(cs *CallState) { +func ReleaseCallState(cs *CallState) { cs.Method = "" cs.Args = nil cs.Result = nil diff --git a/client/client.go b/client/client.go index 4b3dd7b..95bcb40 100644 --- a/client/client.go +++ b/client/client.go @@ -82,7 +82,7 @@ func (c *client) Notify(method string, args ...interface{}) (err error) { select { case <-cs.DoneChan: err = cs.Error - releaseCallState(cs) + ReleaseCallState(cs) } return @@ -103,7 +103,7 @@ func (c *client) CallTimeout(timeout time.Duration, result interface{}, method s select { case <-cs.DoneChan: result, err = cs.Result, cs.Error - releaseCallState(cs) + ReleaseCallState(cs) case <-t.C: cs.Cancel() err = getClientTimeoutError(c, timeout) @@ -120,7 +120,7 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method } if usePool { - cs = retainCallState() + cs = RetainCallState() } else { cs = &CallState{} } @@ -147,7 +147,7 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method // Immediately notify the caller not interested // in the response on requests' queue overflow, since // there are no other ways to notify it later. - releaseCallState(cs) + ReleaseCallState(cs) return nil, getClientOverflowError(c) } @@ -156,9 +156,9 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method if rcs.DoneChan != nil { rcs.Error = getClientOverflowError(c) //close(rcs.DoneChan) - rcs.done() + rcs.Done() } else { - releaseCallState(rcs) + ReleaseCallState(rcs) } default: } @@ -169,7 +169,7 @@ func (c *client) send(usePool bool, hasResponse bool, result interface{}, method default: // Release m even if usePool = true, since m wasn't exposed // to the caller yet. - releaseCallState(cs) + ReleaseCallState(cs) return nil, getClientOverflowError(c) } } @@ -206,7 +206,7 @@ func (c *client) handleRPC() { log.Printf("handleRPC: %v", err) err = &ClientError{ Connection: true, - err: err, + Err: err, } } @@ -214,7 +214,7 @@ func (c *client) handleRPC() { atomic.AddUint32(&c.pendingRequestsCount, ^uint32(0)) cs.Error = err if cs.DoneChan != nil { - cs.done() + cs.Done() } } @@ -246,9 +246,9 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) { if nil != cs.DoneChan { // cs.Error = ErrCanceled // close(m.done) - cs.done() + cs.Done() } else { - releaseCallState(cs) + ReleaseCallState(cs) } continue } @@ -269,7 +269,7 @@ func (c *client) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) { err = c.ch.GetCodec().Write(c.conn, cs.Method, cs.Args, cs.ID) if !cs.hasResponse { cs.Error = err - cs.done() + cs.Done() } if nil != err { err = fmt.Errorf("Client: Cannot send request to wire: [%s]", err) @@ -335,7 +335,7 @@ func (c *client) responseHandle(codecResponse protocol.ClientCodecResponse) erro // } } - cs.done() + cs.Done() return nil } @@ -351,7 +351,7 @@ func getClientTimeoutError(c *client, timeout time.Duration) error { //c.LogError("%s", err) return &ClientError{ Timeout: true, - err: err, + Err: err, } } @@ -360,6 +360,6 @@ func getClientOverflowError(c *client) error { //c.LogError("%s", err) return &ClientError{ Overflow: true, - err: err, + Err: err, } } diff --git a/client/error.go b/client/error.go index e10be8d..4561bb3 100644 --- a/client/error.go +++ b/client/error.go @@ -18,9 +18,9 @@ type ClientError struct { // May be set if AsyncResult.Cancel is called. Canceled bool - err error + Err error } func (e *ClientError) Error() string { - return e.err.Error() + return e.Err.Error() } diff --git a/notify/constants.go b/notify/constants.go new file mode 100644 index 0000000..1682d20 --- /dev/null +++ b/notify/constants.go @@ -0,0 +1,7 @@ +package notify + +const ( + // DefaultPendingNotifies is the default number of pending messages + // handled by Client and Server. + DefaultPendingNotifies = 1024 +) diff --git a/notify/notify.go b/notify/notify.go new file mode 100644 index 0000000..d05280f --- /dev/null +++ b/notify/notify.go @@ -0,0 +1,163 @@ +package notify + +import ( + "fmt" + "log" + "net" + "runtime" + "sync" + + "git.loafle.net/commons_go/rpc/client" +) + +func New(nh NotifyHandler) Notifier { + n := ¬ify{ + nh: nh, + } + return n +} + +type Notifier interface { + Start(conn net.Conn) + Close() + + Notify(method string, args ...interface{}) (err error) +} + +type notify struct { + nh NotifyHandler + + conn net.Conn + + notifyQueueChan chan *client.CallState + + stopChan chan struct{} + stopWg sync.WaitGroup +} + +func (n *notify) Start(conn net.Conn) { + n.nh.Validate() + + if n.stopChan != nil { + panic("RPC Notify: the given notify is already started. Call Notifier.Stop() before calling Notifier.Start(conn) again!") + } + + n.conn = conn + + n.stopChan = make(chan struct{}) + n.notifyQueueChan = make(chan *client.CallState, n.nh.GetPendingNotifies()) + + go n.handleRPC() +} + +func (n *notify) Close() { + if n.stopChan == nil { + panic("RPC Notify: the notify must be started before stopping it") + } + close(n.stopChan) + n.stopWg.Wait() + n.stopChan = nil +} + +func (n *notify) Notify(method string, args ...interface{}) (err error) { + var cs *client.CallState + if cs, err = n.send(method, args...); nil != err { + return + } + + select { + case <-cs.DoneChan: + err = cs.Error + client.ReleaseCallState(cs) + } + + return +} + +func (n *notify) send(method string, args ...interface{}) (cs *client.CallState, err error) { + cs = client.RetainCallState() + + cs.Method = method + cs.Args = args + cs.DoneChan = make(chan *client.CallState, 1) + + select { + case n.notifyQueueChan <- cs: + return cs, nil + default: + client.ReleaseCallState(cs) + return nil, getClientOverflowError(n) + } +} + +func (n *notify) handleRPC() { + subStopChan := make(chan struct{}) + + writerDone := make(chan error, 1) + go n.rpcWriter(subStopChan, writerDone) + + var err error + + select { + case err = <-writerDone: + close(subStopChan) + case <-n.stopChan: + close(subStopChan) + <-writerDone + } + + if err != nil { + //n.LogError("%s", err) + log.Printf("handleRPC: %v", err) + err = &client.ClientError{ + Connection: true, + Err: err, + } + } +} + +func (n *notify) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) { + var err error + defer func() { + writerDone <- err + }() + + for { + var cs *client.CallState + + select { + case cs = <-n.notifyQueueChan: + default: + // Give the last chance for ready goroutines filling n.requestsChan :) + runtime.Gosched() + + select { + case <-stopChan: + return + case cs = <-n.notifyQueueChan: + } + } + + if cs.IsCanceled() { + client.ReleaseCallState(cs) + continue + } + + err = n.nh.GetCodec().Write(n.conn, cs.Method, cs.Args, cs.ID) + cs.Error = err + cs.Done() + if nil != err { + err = fmt.Errorf("RPC Notify: Cannot send notify to wire: [%s]", err) + return + } + } +} + +func getClientOverflowError(n *notify) error { + err := fmt.Errorf("RPC Notify: Notifies' queue with size=%d is overflown. Try increasing NotifyHandler.PendingNotifies value", cap(n.notifyQueueChan)) + //c.LogError("%s", err) + return &client.ClientError{ + Overflow: true, + Err: err, + } +} diff --git a/notify/notify_handler.go b/notify/notify_handler.go new file mode 100644 index 0000000..cf1db4c --- /dev/null +++ b/notify/notify_handler.go @@ -0,0 +1,12 @@ +package notify + +import ( + "git.loafle.net/commons_go/rpc/protocol" +) + +type NotifyHandler interface { + GetCodec() protocol.ClientCodec + GetPendingNotifies() int + + Validate() +} diff --git a/notify/notify_handlers.go b/notify/notify_handlers.go new file mode 100644 index 0000000..8fd1ffd --- /dev/null +++ b/notify/notify_handlers.go @@ -0,0 +1,32 @@ +package notify + +import ( + "git.loafle.net/commons_go/rpc/protocol" +) + +type NotifyHandlers struct { + Codec protocol.ClientCodec + + // The maximum number of pending requests in the queue. + // + // The number of pending requsts should exceed the expected number + // of concurrent goroutines calling client's methods. + // Otherwise a lot of ClientError.Overflow errors may appear. + // + // Default is DefaultPendingNotifies. + PendingNotifies int +} + +func (nh *NotifyHandlers) GetCodec() protocol.ClientCodec { + return nh.Codec +} + +func (nh *NotifyHandlers) GetPendingNotifies() int { + return nh.PendingNotifies +} + +func (nh *NotifyHandlers) Validate() { + if nh.PendingNotifies <= 0 { + nh.PendingNotifies = DefaultPendingNotifies + } +}