This commit is contained in:
crusader 2017-10-31 19:46:32 +09:00
parent ab60b6ccda
commit d7fdb8c4f3
2 changed files with 22 additions and 22 deletions

View File

@ -17,13 +17,13 @@ type Client interface {
func NewClient(ch ClientHandler) Client { func NewClient(ch ClientHandler) Client {
s := &client{ s := &client{
clientHandler: ch, ch: ch,
} }
return s return s
} }
type client struct { type client struct {
clientHandler ClientHandler ch ClientHandler
Stats ConnStats Stats ConnStats
@ -32,17 +32,17 @@ type client struct {
} }
func (c *client) Start() error { func (c *client) Start() error {
if nil == c.clientHandler { if nil == c.ch {
panic("Client: client handler must be specified.") panic("Client: client handler must be specified.")
} }
c.clientHandler.Validate() c.ch.Validate()
if c.stopChan != nil { if c.stopChan != nil {
panic("Client: client is already running. Stop it before starting it again") panic("Client: client is already running. Stop it before starting it again")
} }
c.stopChan = make(chan struct{}) c.stopChan = make(chan struct{})
c.clientHandler.OnStart() c.ch.OnStart()
c.stopWg.Add(1) c.stopWg.Add(1)
go runClient(c) go runClient(c)
@ -57,7 +57,7 @@ func (c *client) Stop() {
close(c.stopChan) close(c.stopChan)
c.stopWg.Wait() c.stopWg.Wait()
c.stopChan = nil c.stopChan = nil
c.clientHandler.OnStop() c.ch.OnStop()
} }
func runClient(c *client) { func runClient(c *client) {
@ -70,9 +70,9 @@ func runClient(c *client) {
for { for {
dialChan := make(chan struct{}) dialChan := make(chan struct{})
go func() { go func() {
if conn, err = c.clientHandler.Dial(); err != nil { if conn, err = c.ch.Dial(); err != nil {
if stopping.Load() == nil { if stopping.Load() == nil {
logging.Logger.Error(fmt.Sprintf("Client: [%s].Cannot establish rpc connection: [%s]", c.clientHandler.GetAddr(), err)) logging.Logger.Error(fmt.Sprintf("Client: [%s].Cannot establish rpc connection: [%s]", c.ch.GetAddr(), err))
} }
} }
close(dialChan) close(dialChan)
@ -109,14 +109,14 @@ func runClient(c *client) {
} }
func handleClientConnection(c *client, conn io.ReadWriteCloser) { func handleClientConnection(c *client, conn io.ReadWriteCloser) {
if err := c.clientHandler.OnHandshake(c.clientHandler.GetAddr(), conn); nil != err { if err := c.ch.OnHandshake(c.ch.GetAddr(), conn); nil != err {
logging.Logger.Error(fmt.Sprintf("Client: [%s]. handshake error: [%s]", c.clientHandler.GetAddr(), err)) logging.Logger.Error(fmt.Sprintf("Client: [%s]. handshake error: [%s]", c.ch.GetAddr(), err))
conn.Close() conn.Close()
return return
} }
clientStopChan := make(chan struct{}) clientStopChan := make(chan struct{})
go c.clientHandler.Handle(conn, clientStopChan) go c.ch.Handle(conn, clientStopChan)
select { select {
case <-c.stopChan: case <-c.stopChan:

View File

@ -19,13 +19,13 @@ type Server interface {
func NewServer(sh ServerHandler) Server { func NewServer(sh ServerHandler) Server {
s := &server{ s := &server{
serverHandler: sh, sh: sh,
} }
return s return s
} }
type server struct { type server struct {
serverHandler ServerHandler sh ServerHandler
listener net.Listener listener net.Listener
@ -36,21 +36,21 @@ type server struct {
} }
func (s *server) Start() error { func (s *server) Start() error {
if nil == s.serverHandler { if nil == s.sh {
panic("Server: server handler must be specified.") panic("Server: server handler must be specified.")
} }
s.serverHandler.Validate() s.sh.Validate()
if s.stopChan != nil { if s.stopChan != nil {
panic("Server: server is already running. Stop it before starting it again") panic("Server: server is already running. Stop it before starting it again")
} }
s.stopChan = make(chan struct{}) s.stopChan = make(chan struct{})
var err error var err error
if s.listener, err = s.serverHandler.Listen(); nil != err { if s.listener, err = s.sh.Listen(); nil != err {
return err return err
} }
s.serverHandler.OnStart() s.sh.OnStart()
s.stopWg.Add(1) s.stopWg.Add(1)
go runServer(s) go runServer(s)
@ -65,7 +65,7 @@ func (s *server) Stop() {
close(s.stopChan) close(s.stopChan)
s.stopWg.Wait() s.stopWg.Wait()
s.stopChan = nil s.stopChan = nil
s.serverHandler.OnStop() s.sh.OnStop()
} }
func (s *server) Serve() error { func (s *server) Serve() error {
@ -87,9 +87,9 @@ func runServer(s *server) {
for { for {
acceptChan := make(chan struct{}) acceptChan := make(chan struct{})
go func() { go func() {
if conn, clientAddr, err = s.serverHandler.accept(s.listener); err != nil { if conn, clientAddr, err = s.sh.accept(s.listener); err != nil {
if stopping.Load() == nil { if stopping.Load() == nil {
logging.Logger.Error(fmt.Sprintf("Server: [%s]. Cannot accept new connection: [%s]", s.serverHandler.GetAddr(), err)) logging.Logger.Error(fmt.Sprintf("Server: [%s]. Cannot accept new connection: [%s]", s.sh.GetAddr(), err))
} }
} }
close(acceptChan) close(acceptChan)
@ -123,7 +123,7 @@ func runServer(s *server) {
func handleServerConnection(s *server, conn io.ReadWriteCloser, clientAddr string) { func handleServerConnection(s *server, conn io.ReadWriteCloser, clientAddr string) {
defer s.stopWg.Done() defer s.stopWg.Done()
if err := s.serverHandler.OnHandshake(clientAddr, conn); nil != err { if err := s.sh.OnHandshake(clientAddr, conn); nil != err {
logging.Logger.Error(fmt.Sprintf("Server: [%s]. handshake error: [%s]", clientAddr, err)) logging.Logger.Error(fmt.Sprintf("Server: [%s]. handshake error: [%s]", clientAddr, err))
conn.Close() conn.Close()
return return
@ -131,7 +131,7 @@ func handleServerConnection(s *server, conn io.ReadWriteCloser, clientAddr strin
logging.Logger.Debug(fmt.Sprintf("Server: Client[%s] is connected.", clientAddr)) logging.Logger.Debug(fmt.Sprintf("Server: Client[%s] is connected.", clientAddr))
clientStopChan := make(chan struct{}) clientStopChan := make(chan struct{})
go s.serverHandler.Handle(clientAddr, conn, clientStopChan) go s.sh.Handle(clientAddr, conn, clientStopChan)
select { select {
case <-s.stopChan: case <-s.stopChan: