127 lines
2.2 KiB
Go
127 lines
2.2 KiB
Go
|
package server
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
|
||
|
"git.loafle.net/commons_go/logging"
|
||
|
)
|
||
|
|
||
|
type Client interface {
|
||
|
Start() error
|
||
|
Stop()
|
||
|
}
|
||
|
|
||
|
func NewClient(ch ClientHandler) Client {
|
||
|
s := &client{
|
||
|
clientHandler: ch,
|
||
|
}
|
||
|
return s
|
||
|
}
|
||
|
|
||
|
type client struct {
|
||
|
clientHandler ClientHandler
|
||
|
|
||
|
Stats ConnStats
|
||
|
|
||
|
stopChan chan struct{}
|
||
|
stopWg sync.WaitGroup
|
||
|
}
|
||
|
|
||
|
func (c *client) Start() error {
|
||
|
if nil == c.clientHandler {
|
||
|
panic("Client: client handler must be specified.")
|
||
|
}
|
||
|
c.clientHandler.Validate()
|
||
|
|
||
|
if c.stopChan != nil {
|
||
|
panic("Client: client is already running. Stop it before starting it again")
|
||
|
}
|
||
|
c.stopChan = make(chan struct{})
|
||
|
|
||
|
c.stopWg.Add(1)
|
||
|
go runClient(c)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (c *client) Stop() {
|
||
|
if c.stopChan == nil {
|
||
|
panic("Client: client must be started before stopping it")
|
||
|
}
|
||
|
close(c.stopChan)
|
||
|
c.stopWg.Wait()
|
||
|
c.stopChan = nil
|
||
|
}
|
||
|
|
||
|
func runClient(c *client) {
|
||
|
defer c.stopWg.Done()
|
||
|
|
||
|
var conn io.ReadWriteCloser
|
||
|
var err error
|
||
|
var stopping atomic.Value
|
||
|
|
||
|
for {
|
||
|
dialChan := make(chan struct{})
|
||
|
go func() {
|
||
|
if conn, err = c.clientHandler.Dial(); err != nil {
|
||
|
if stopping.Load() == nil {
|
||
|
logging.Logger.Error(fmt.Sprintf("Client: [%s].Cannot establish rpc connection: [%s]", c.clientHandler.GetAddr(), err))
|
||
|
}
|
||
|
}
|
||
|
close(dialChan)
|
||
|
}()
|
||
|
|
||
|
select {
|
||
|
case <-c.stopChan:
|
||
|
stopping.Store(true)
|
||
|
<-dialChan
|
||
|
return
|
||
|
case <-dialChan:
|
||
|
c.Stats.incDialCalls()
|
||
|
}
|
||
|
|
||
|
if err != nil {
|
||
|
c.Stats.incDialErrors()
|
||
|
select {
|
||
|
case <-c.stopChan:
|
||
|
return
|
||
|
case <-time.After(time.Second):
|
||
|
}
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
c.stopWg.Add(1)
|
||
|
handleClientConnection(c, conn)
|
||
|
|
||
|
select {
|
||
|
case <-c.stopChan:
|
||
|
return
|
||
|
default:
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func handleClientConnection(c *client, conn io.ReadWriteCloser) {
|
||
|
if err := c.clientHandler.OnHandshake(c.clientHandler.GetAddr(), conn); nil != err {
|
||
|
logging.Logger.Error(fmt.Sprintf("Client: [%s]. handshake error: [%s]", c.clientHandler.GetAddr(), err))
|
||
|
conn.Close()
|
||
|
return
|
||
|
}
|
||
|
|
||
|
clientStopChan := make(chan struct{})
|
||
|
go c.clientHandler.Handle(conn, clientStopChan)
|
||
|
|
||
|
select {
|
||
|
case <-c.stopChan:
|
||
|
close(clientStopChan)
|
||
|
conn.Close()
|
||
|
return
|
||
|
case <-clientStopChan:
|
||
|
conn.Close()
|
||
|
return
|
||
|
}
|
||
|
}
|