This commit is contained in:
crusader 2018-04-04 17:05:39 +09:00
parent e12a800dd3
commit 29455b69ca

View File

@ -188,6 +188,10 @@ func (s *Server) handleConnection(servlet Servlet, servletCtx server.ServletCtx,
defer s.connections.Delete(conn) defer s.connections.Delete(conn)
servlet.OnConnect(servletCtx, conn) servlet.OnConnect(servletCtx, conn)
conn.SetCloseHandler(func(code int, text string) error {
logging.Logger().Debugf("close")
return nil
})
stopChan := make(chan struct{}) stopChan := make(chan struct{})
servletDoneChan := make(chan struct{}) servletDoneChan := make(chan struct{})
@ -205,35 +209,27 @@ func (s *Server) handleConnection(servlet Servlet, servletCtx server.ServletCtx,
select { select {
case <-readerDoneChan: case <-readerDoneChan:
close(stopChan) close(stopChan)
conn.Close()
<-writerDoneChan <-writerDoneChan
<-servletDoneChan <-servletDoneChan
conn = nil
case <-writerDoneChan: case <-writerDoneChan:
close(stopChan) close(stopChan)
conn.Close()
<-readerDoneChan <-readerDoneChan
<-servletDoneChan <-servletDoneChan
conn = nil
case <-servletDoneChan: case <-servletDoneChan:
close(stopChan) close(stopChan)
conn.Close()
<-readerDoneChan <-readerDoneChan
<-writerDoneChan <-writerDoneChan
conn = nil
case <-s.stopChan: case <-s.stopChan:
close(stopChan) close(stopChan)
conn.Close()
<-readerDoneChan <-readerDoneChan
<-writerDoneChan <-writerDoneChan
<-servletDoneChan <-servletDoneChan
conn = nil
} }
} }
func handleRead(s *Server, conn *server.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}, readChan chan []byte) { func handleRead(s *Server, conn *server.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}, readChan chan []byte) {
defer func() { defer func() {
close(doneChan) doneChan <- struct{}{}
}() }()
if 0 < s.ServerHandler.GetMaxMessageSize() { if 0 < s.ServerHandler.GetMaxMessageSize() {
@ -257,37 +253,31 @@ func handleRead(s *Server, conn *server.Conn, doneChan chan<- struct{}, stopChan
go func() { go func() {
_, message, err = conn.ReadMessage() _, message, err = conn.ReadMessage()
if err != nil {
if server.IsUnexpectedCloseError(err, server.CloseGoingAway, server.CloseAbnormalClosure) {
logging.Logger().Debugf(s.serverMessage(fmt.Sprintf("Read error %v", err)))
}
}
close(readMessageChan) close(readMessageChan)
}() }()
select { select {
case <-s.stopChan: case <-stopChan:
conn.Close()
<-readMessageChan <-readMessageChan
break break
case <-readMessageChan: case <-readMessageChan:
} }
if nil != err { if nil != err {
select { if server.IsUnexpectedCloseError(err, server.CloseGoingAway, server.CloseAbnormalClosure) {
case <-s.stopChan: logging.Logger().Debugf(s.serverMessage(fmt.Sprintf("Read error %v", err)))
break
case <-time.After(time.Second):
} }
continue break
} }
readChan <- message readChan <- message
} }
} }
func handleWrite(s *Server, conn *server.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}, writeChan chan []byte) { func handleWrite(s *Server, conn *server.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}, writeChan chan []byte) {
defer func() { defer func() {
close(doneChan) doneChan <- struct{}{}
}() }()
ticker := time.NewTicker(s.ServerHandler.GetPingPeriod()) ticker := time.NewTicker(s.ServerHandler.GetPingPeriod())
@ -319,8 +309,8 @@ func handleWrite(s *Server, conn *server.Conn, doneChan chan<- struct{}, stopCha
if err := conn.WriteMessage(server.PingMessage, nil); nil != err { if err := conn.WriteMessage(server.PingMessage, nil); nil != err {
return return
} }
case <-s.stopChan: case <-stopChan:
break return
} }
} }
} }