This commit is contained in:
crusader 2018-04-04 22:50:34 +09:00
parent 1ca5974e12
commit acbc1b22be
6 changed files with 49 additions and 49 deletions

View File

@ -4,28 +4,28 @@ import (
"time" "time"
) )
type ClientConnectionHandler interface { type ClientConnHandler interface {
ConnectionHandler ConnectionHandler
GetReconnectInterval() time.Duration GetReconnectInterval() time.Duration
GetReconnectTryTime() int GetReconnectTryTime() int
} }
type ClientConnectionHandlers struct { type ClientConnHandlers struct {
ConnectionHandlers ConnectionHandlers
ReconnectInterval time.Duration ReconnectInterval time.Duration
ReconnectTryTime int ReconnectTryTime int
} }
func (cch *ClientConnectionHandlers) GetReconnectInterval() time.Duration { func (cch *ClientConnHandlers) GetReconnectInterval() time.Duration {
return cch.ReconnectInterval return cch.ReconnectInterval
} }
func (cch *ClientConnectionHandlers) GetReconnectTryTime() int { func (cch *ClientConnHandlers) GetReconnectTryTime() int {
return cch.ReconnectTryTime return cch.ReconnectTryTime
} }
func (cch *ClientConnectionHandlers) Validate() error { func (cch *ClientConnHandlers) Validate() error {
if err := cch.ConnectionHandlers.Validate(); nil != err { if err := cch.ConnectionHandlers.Validate(); nil != err {
return err return err
} }

View File

@ -7,7 +7,7 @@ import (
logging "git.loafle.net/commons/logging-go" logging "git.loafle.net/commons/logging-go"
) )
type ClientRWCHandler struct { type ClientReadWriter struct {
ReadwriteHandler ReadWriteHandler ReadwriteHandler ReadWriteHandler
ReadChan chan<- []byte ReadChan chan<- []byte
WriteChan <-chan []byte WriteChan <-chan []byte
@ -17,14 +17,14 @@ type ClientRWCHandler struct {
ClientStopWg *sync.WaitGroup ClientStopWg *sync.WaitGroup
} }
func (crwch *ClientRWCHandler) HandleConnection(conn *Conn) { func (crw *ClientReadWriter) HandleConnection(conn *Conn) {
defer func() { defer func() {
if nil != conn { if nil != conn {
conn.Close() conn.Close()
} }
logging.Logger().Infof("disconnected") logging.Logger().Infof("disconnected")
crwch.ClientStopWg.Done() crw.ClientStopWg.Done()
}() }()
logging.Logger().Infof("connected") logging.Logger().Infof("connected")
@ -39,8 +39,8 @@ func (crwch *ClientRWCHandler) HandleConnection(conn *Conn) {
for { for {
if nil != err { if nil != err {
if io.EOF == err || io.ErrUnexpectedEOF == err { if io.EOF == err || io.ErrUnexpectedEOF == err {
crwch.DisconnectedChan <- struct{}{} crw.DisconnectedChan <- struct{}{}
newConn := <-crwch.ReconnectedChan newConn := <-crw.ReconnectedChan
if nil == newConn { if nil == newConn {
return return
} }
@ -50,8 +50,8 @@ func (crwch *ClientRWCHandler) HandleConnection(conn *Conn) {
} }
} }
go connReadHandler(crwch.ReadwriteHandler, conn, stopChan, readerDoneChan, crwch.ReadChan) go connReadHandler(crw.ReadwriteHandler, conn, stopChan, readerDoneChan, crw.ReadChan)
go connWriteHandler(crwch.ReadwriteHandler, conn, stopChan, writerDoneChan, crwch.WriteChan) go connWriteHandler(crw.ReadwriteHandler, conn, stopChan, writerDoneChan, crw.WriteChan)
select { select {
case err = <-readerDoneChan: case err = <-readerDoneChan:
@ -60,7 +60,7 @@ func (crwch *ClientRWCHandler) HandleConnection(conn *Conn) {
case err = <-writerDoneChan: case err = <-writerDoneChan:
close(stopChan) close(stopChan)
<-readerDoneChan <-readerDoneChan
case <-crwch.ClientStopChan: case <-crw.ClientStopChan:
close(stopChan) close(stopChan)
<-readerDoneChan <-readerDoneChan
<-writerDoneChan <-writerDoneChan

View File

@ -23,7 +23,7 @@ import (
var errMalformedURL = errors.New("malformed ws or wss URL") var errMalformedURL = errors.New("malformed ws or wss URL")
type Client struct { type Client struct {
server.ClientConnectionHandlers server.ClientConnHandlers
server.ReadWriteHandlers server.ReadWriteHandlers
Name string Name string
@ -58,7 +58,7 @@ type Client struct {
disconnectedChan chan struct{} disconnectedChan chan struct{}
reconnectedChan chan *server.Conn reconnectedChan chan *server.Conn
crwch server.ClientRWCHandler crw server.ClientReadWriter
} }
func (c *Client) Connect() (readChan <-chan []byte, writeChan chan<- []byte, res *http.Response, err error) { func (c *Client) Connect() (readChan <-chan []byte, writeChan chan<- []byte, res *http.Response, err error) {
@ -86,16 +86,16 @@ func (c *Client) Connect() (readChan <-chan []byte, writeChan chan<- []byte, res
c.reconnectedChan = make(chan *server.Conn) c.reconnectedChan = make(chan *server.Conn)
c.stopChan = make(chan struct{}) c.stopChan = make(chan struct{})
c.crwch.ReadwriteHandler = c c.crw.ReadwriteHandler = c
c.crwch.ReadChan = c.readChan c.crw.ReadChan = c.readChan
c.crwch.WriteChan = c.writeChan c.crw.WriteChan = c.writeChan
c.crwch.ClientStopChan = c.stopChan c.crw.ClientStopChan = c.stopChan
c.crwch.ClientStopWg = &c.stopWg c.crw.ClientStopWg = &c.stopWg
c.crwch.DisconnectedChan = c.disconnectedChan c.crw.DisconnectedChan = c.disconnectedChan
c.crwch.ReconnectedChan = c.reconnectedChan c.crw.ReconnectedChan = c.reconnectedChan
c.stopWg.Add(1) c.stopWg.Add(1)
go c.crwch.HandleConnection(conn) go c.crw.HandleConnection(conn)
return c.readChan, c.writeChan, res, nil return c.readChan, c.writeChan, res, nil
} }

View File

@ -12,7 +12,7 @@ import (
) )
type Client struct { type Client struct {
server.ClientConnectionHandlers server.ClientConnHandlers
server.ReadWriteHandlers server.ReadWriteHandlers
Name string Name string
@ -30,7 +30,7 @@ type Client struct {
disconnectedChan chan struct{} disconnectedChan chan struct{}
reconnectedChan chan *server.Conn reconnectedChan chan *server.Conn
crwch server.ClientRWCHandler crw server.ClientReadWriter
} }
func (c *Client) Connect() (readChan <-chan []byte, writeChan chan<- []byte, err error) { func (c *Client) Connect() (readChan <-chan []byte, writeChan chan<- []byte, err error) {
@ -58,16 +58,16 @@ func (c *Client) Connect() (readChan <-chan []byte, writeChan chan<- []byte, err
c.reconnectedChan = make(chan *server.Conn) c.reconnectedChan = make(chan *server.Conn)
c.stopChan = make(chan struct{}) c.stopChan = make(chan struct{})
c.crwch.ReadwriteHandler = c c.crw.ReadwriteHandler = c
c.crwch.ReadChan = c.readChan c.crw.ReadChan = c.readChan
c.crwch.WriteChan = c.writeChan c.crw.WriteChan = c.writeChan
c.crwch.ClientStopChan = c.stopChan c.crw.ClientStopChan = c.stopChan
c.crwch.ClientStopWg = &c.stopWg c.crw.ClientStopWg = &c.stopWg
c.crwch.DisconnectedChan = c.disconnectedChan c.crw.DisconnectedChan = c.disconnectedChan
c.crwch.ReconnectedChan = c.reconnectedChan c.crw.ReconnectedChan = c.reconnectedChan
c.stopWg.Add(1) c.stopWg.Add(1)
go c.crwch.HandleConnection(conn) go c.crw.HandleConnection(conn)
return c.readChan, c.writeChan, nil return c.readChan, c.writeChan, nil
} }

View File

@ -19,7 +19,7 @@ type Server struct {
stopChan chan struct{} stopChan chan struct{}
stopWg sync.WaitGroup stopWg sync.WaitGroup
srwch server.ServerRWCHandler srw server.ServerReadWriter
} }
func (s *Server) ListenAndServe() error { func (s *Server) ListenAndServe() error {
@ -53,9 +53,9 @@ func (s *Server) ListenAndServe() error {
s.stopChan = make(chan struct{}) s.stopChan = make(chan struct{})
s.srwch.ReadwriteHandler = s.ServerHandler s.srw.ReadwriteHandler = s.ServerHandler
s.srwch.ServerStopChan = s.stopChan s.srw.ServerStopChan = s.stopChan
s.srwch.ServerStopWg = &s.stopWg s.srw.ServerStopWg = &s.stopWg
s.stopWg.Add(1) s.stopWg.Add(1)
return s.handleServer(listener) return s.handleServer(listener)
@ -136,7 +136,7 @@ func (s *Server) handleServer(listener net.Listener) error {
} }
if 0 < s.ServerHandler.GetConcurrency() { if 0 < s.ServerHandler.GetConcurrency() {
sz := s.srwch.ConnectionSize() sz := s.srw.ConnectionSize()
if sz >= s.ServerHandler.GetConcurrency() { if sz >= s.ServerHandler.GetConcurrency() {
logging.Logger().Warnf(s.serverMessage(fmt.Sprintf("max connections size %d, refuse", sz))) logging.Logger().Warnf(s.serverMessage(fmt.Sprintf("max connections size %d, refuse", sz)))
netConn.Close() netConn.Close()
@ -164,6 +164,6 @@ func (s *Server) handleServer(listener net.Listener) error {
conn := server.NewConn(netConn, true, s.ServerHandler.GetReadBufferSize(), s.ServerHandler.GetWriteBufferSize()) conn := server.NewConn(netConn, true, s.ServerHandler.GetReadBufferSize(), s.ServerHandler.GetWriteBufferSize())
s.stopWg.Add(1) s.stopWg.Add(1)
go s.srwch.HandleConnection(servlet, servletCtx, conn) go s.srw.HandleConnection(servlet, servletCtx, conn)
} }
} }

View File

@ -6,7 +6,7 @@ import (
logging "git.loafle.net/commons/logging-go" logging "git.loafle.net/commons/logging-go"
) )
type ServerRWCHandler struct { type ServerReadWriter struct {
connections sync.Map connections sync.Map
ReadwriteHandler ReadWriteHandler ReadwriteHandler ReadWriteHandler
@ -14,16 +14,16 @@ type ServerRWCHandler struct {
ServerStopWg *sync.WaitGroup ServerStopWg *sync.WaitGroup
} }
func (srwch *ServerRWCHandler) ConnectionSize() int { func (srw *ServerReadWriter) ConnectionSize() int {
var sz int var sz int
srwch.connections.Range(func(k, v interface{}) bool { srw.connections.Range(func(k, v interface{}) bool {
sz++ sz++
return true return true
}) })
return sz return sz
} }
func (srwch *ServerRWCHandler) HandleConnection(servlet Servlet, servletCtx ServletCtx, conn *Conn) { func (srw *ServerReadWriter) HandleConnection(servlet Servlet, servletCtx ServletCtx, conn *Conn) {
addr := conn.RemoteAddr() addr := conn.RemoteAddr()
defer func() { defer func() {
@ -32,13 +32,13 @@ func (srwch *ServerRWCHandler) HandleConnection(servlet Servlet, servletCtx Serv
} }
servlet.OnDisconnect(servletCtx) servlet.OnDisconnect(servletCtx)
logging.Logger().Infof("Client[%s] has been disconnected", addr) logging.Logger().Infof("Client[%s] has been disconnected", addr)
srwch.ServerStopWg.Done() srw.ServerStopWg.Done()
}() }()
logging.Logger().Infof("Client[%s] has been connected", addr) logging.Logger().Infof("Client[%s] has been connected", addr)
srwch.connections.Store(conn, true) srw.connections.Store(conn, true)
defer srwch.connections.Delete(conn) defer srw.connections.Delete(conn)
servlet.OnConnect(servletCtx, conn) servlet.OnConnect(servletCtx, conn)
conn.SetCloseHandler(func(code int, text string) error { conn.SetCloseHandler(func(code int, text string) error {
@ -56,8 +56,8 @@ func (srwch *ServerRWCHandler) HandleConnection(servlet Servlet, servletCtx Serv
writerDoneChan := make(chan error) writerDoneChan := make(chan error)
go servlet.Handle(servletCtx, stopChan, servletDoneChan, readChan, writeChan) go servlet.Handle(servletCtx, stopChan, servletDoneChan, readChan, writeChan)
go connReadHandler(srwch.ReadwriteHandler, conn, stopChan, readerDoneChan, readChan) go connReadHandler(srw.ReadwriteHandler, conn, stopChan, readerDoneChan, readChan)
go connWriteHandler(srwch.ReadwriteHandler, conn, stopChan, writerDoneChan, writeChan) go connWriteHandler(srw.ReadwriteHandler, conn, stopChan, writerDoneChan, writeChan)
select { select {
case <-readerDoneChan: case <-readerDoneChan:
@ -72,7 +72,7 @@ func (srwch *ServerRWCHandler) HandleConnection(servlet Servlet, servletCtx Serv
close(stopChan) close(stopChan)
<-readerDoneChan <-readerDoneChan
<-writerDoneChan <-writerDoneChan
case <-srwch.ServerStopChan: case <-srw.ServerStopChan:
close(stopChan) close(stopChan)
<-readerDoneChan <-readerDoneChan
<-writerDoneChan <-writerDoneChan