package server import ( "fmt" "io" "sync" "sync/atomic" "time" "git.loafle.net/commons_go/logging" ) type Client interface { Start() error Stop() } func NewClient(ch ClientHandler) Client { s := &client{ clientHandler: ch, } return s } type client struct { clientHandler ClientHandler Stats ConnStats stopChan chan struct{} stopWg sync.WaitGroup } func (c *client) Start() error { if nil == c.clientHandler { panic("Client: client handler must be specified.") } c.clientHandler.Validate() if c.stopChan != nil { panic("Client: client is already running. Stop it before starting it again") } c.stopChan = make(chan struct{}) c.stopWg.Add(1) go runClient(c) return nil } func (c *client) Stop() { if c.stopChan == nil { panic("Client: client must be started before stopping it") } close(c.stopChan) c.stopWg.Wait() c.stopChan = nil } func runClient(c *client) { defer c.stopWg.Done() var conn io.ReadWriteCloser var err error var stopping atomic.Value for { dialChan := make(chan struct{}) go func() { if conn, err = c.clientHandler.Dial(); err != nil { if stopping.Load() == nil { logging.Logger.Error(fmt.Sprintf("Client: [%s].Cannot establish rpc connection: [%s]", c.clientHandler.GetAddr(), err)) } } close(dialChan) }() select { case <-c.stopChan: stopping.Store(true) <-dialChan return case <-dialChan: c.Stats.incDialCalls() } if err != nil { c.Stats.incDialErrors() select { case <-c.stopChan: return case <-time.After(time.Second): } continue } c.stopWg.Add(1) handleClientConnection(c, conn) select { case <-c.stopChan: return default: } } } func handleClientConnection(c *client, conn io.ReadWriteCloser) { if err := c.clientHandler.OnHandshake(c.clientHandler.GetAddr(), conn); nil != err { logging.Logger.Error(fmt.Sprintf("Client: [%s]. handshake error: [%s]", c.clientHandler.GetAddr(), err)) conn.Close() return } clientStopChan := make(chan struct{}) go c.clientHandler.Handle(conn, clientStopChan) select { case <-c.stopChan: close(clientStopChan) conn.Close() return case <-clientStopChan: conn.Close() return } }