package server import ( "fmt" "io" "log" "sync" "sync/atomic" "time" "git.loafle.net/commons_go/logging" ) type Client interface { Start() Stop() } func NewClient(ch ClientHandler) Client { s := &client{ ch: ch, } return s } type client struct { ch ClientHandler Stats ConnStats stopChan chan struct{} stopWg sync.WaitGroup } func (c *client) Start() { if nil == c.ch { panic("Client: client handler must be specified.") } c.ch.Validate() if c.stopChan != nil { panic("Client: client is already running. Stop it before starting it again") } c.stopChan = make(chan struct{}) c.ch.OnStart() c.stopWg.Add(1) go runClient(c) } func (c *client) Stop() { if c.stopChan == nil { panic("Client: client must be started before stopping it") } close(c.stopChan) c.stopWg.Wait() c.stopChan = nil c.ch.OnStop() } func runClient(c *client) { defer c.stopWg.Done() var conn io.ReadWriteCloser var err error var stopping atomic.Value for { dialChan := make(chan struct{}) go func() { if conn, err = c.ch.Dial(); err != nil { if stopping.Load() == nil { logging.Logger.Error(fmt.Sprintf("Client: [%s].Cannot establish rpc connection: [%s]", c.ch.GetAddr(), err)) } } close(dialChan) }() select { case <-c.stopChan: stopping.Store(true) <-dialChan return case <-dialChan: c.Stats.incDialCalls() } if err != nil { c.Stats.incDialErrors() select { case <-c.stopChan: return case <-time.After(time.Second): } continue } handleClientConnection(c, conn) select { case <-c.stopChan: return default: } } } func handleClientConnection(c *client, conn io.ReadWriteCloser) { if err := c.ch.OnHandshake(c.ch.GetAddr(), conn); nil != err { logging.Logger.Error(fmt.Sprintf("Client: [%s]. handshake error: [%s]", c.ch.GetAddr(), err)) conn.Close() return } log.Printf("handleClientConnection") clientStopChan := make(chan struct{}) go c.ch.Handle(conn, clientStopChan) select { case <-c.stopChan: close(clientStopChan) conn.Close() return case <-clientStopChan: conn.Close() return } }