package notify import ( "fmt" "log" "net" "runtime" "sync" "git.loafle.net/commons_go/rpc/client" ) func New(nh NotifyHandler) Notifier { n := ¬ify{ nh: nh, } return n } type Notifier interface { Start(conn net.Conn) Close() Notify(method string, args ...interface{}) (err error) } type notify struct { nh NotifyHandler conn net.Conn notifyQueueChan chan *client.CallState stopChan chan struct{} stopWg sync.WaitGroup } func (n *notify) Start(conn net.Conn) { n.nh.Validate() if n.stopChan != nil { panic("RPC Notify: the given notify is already started. Call Notifier.Stop() before calling Notifier.Start(conn) again!") } n.conn = conn n.stopChan = make(chan struct{}) n.notifyQueueChan = make(chan *client.CallState, n.nh.GetPendingNotifies()) go n.handleRPC() } func (n *notify) Close() { if n.stopChan == nil { panic("RPC Notify: the notify must be started before stopping it") } close(n.stopChan) n.stopWg.Wait() n.stopChan = nil } func (n *notify) Notify(method string, args ...interface{}) (err error) { var cs *client.CallState if cs, err = n.send(method, args...); nil != err { return } select { case <-cs.DoneChan: err = cs.Error client.ReleaseCallState(cs) } return } func (n *notify) send(method string, args ...interface{}) (cs *client.CallState, err error) { cs = client.RetainCallState() cs.Method = method cs.Args = args cs.DoneChan = make(chan *client.CallState, 1) select { case n.notifyQueueChan <- cs: return cs, nil default: client.ReleaseCallState(cs) return nil, getClientOverflowError(n) } } func (n *notify) handleRPC() { subStopChan := make(chan struct{}) writerDone := make(chan error, 1) go n.rpcWriter(subStopChan, writerDone) var err error select { case err = <-writerDone: close(subStopChan) case <-n.stopChan: close(subStopChan) <-writerDone } if err != nil { //n.LogError("%s", err) log.Printf("handleRPC: %v", err) err = &client.ClientError{ Connection: true, Err: err, } } } func (n *notify) rpcWriter(stopChan <-chan struct{}, writerDone chan<- error) { var err error defer func() { writerDone <- err }() for { var cs *client.CallState select { case cs = <-n.notifyQueueChan: default: // Give the last chance for ready goroutines filling n.requestsChan :) runtime.Gosched() select { case <-stopChan: return case cs = <-n.notifyQueueChan: } } if cs.IsCanceled() { client.ReleaseCallState(cs) continue } err = n.nh.GetCodec().Write(n.conn, cs.Method, cs.Args, cs.ID) cs.Error = err cs.Done() if nil != err { err = fmt.Errorf("RPC Notify: Cannot send notify to wire: [%s]", err) return } } } func getClientOverflowError(n *notify) error { err := fmt.Errorf("RPC Notify: Notifies' queue with size=%d is overflown. Try increasing NotifyHandler.PendingNotifies value", cap(n.notifyQueueChan)) //c.LogError("%s", err) return &client.ClientError{ Overflow: true, Err: err, } }