package socket import ( "fmt" "io" "time" ) func connReadHandler(readWriteHandler ReadWriteHandler, conn Conn, stopChan <-chan struct{}, doneChan chan<- error, readChan chan<- []byte) { var ( message []byte err error ) defer func() { doneChan <- err }() if 0 < readWriteHandler.GetMaxMessageSize() { conn.SetReadLimit(readWriteHandler.GetMaxMessageSize()) } if 0 < readWriteHandler.GetReadTimeout() { conn.SetReadDeadline(time.Now().Add(readWriteHandler.GetReadTimeout())) } conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(readWriteHandler.GetPongTimeout())) return nil }) for { readMessageChan := make(chan struct{}) go func() { _, message, err = conn.ReadMessage() close(readMessageChan) }() select { case <-stopChan: <-readMessageChan return case <-readMessageChan: } if nil != err { if IsUnexpectedCloseError(err, CloseGoingAway, CloseAbnormalClosure) { err = fmt.Errorf("Read error %v", err) } return } readChan <- message } } func connWriteHandler(readWriteHandler ReadWriteHandler, conn Conn, stopChan <-chan struct{}, doneChan chan<- error, writeChan <-chan []byte) { var ( wc io.WriteCloser message []byte ok bool err error ) defer func() { doneChan <- err }() ticker := time.NewTicker(readWriteHandler.GetPingPeriod()) defer func() { ticker.Stop() }() for { select { case message, ok = <-writeChan: if 0 < readWriteHandler.GetWriteTimeout() { conn.SetWriteDeadline(time.Now().Add(readWriteHandler.GetWriteTimeout())) } if !ok { conn.WriteMessage(CloseMessage, []byte{}) return } wc, err = conn.NextWriter(TextMessage) if err != nil { return } wc.Write(message) if err = wc.Close(); nil != err { return } case <-ticker.C: conn.SetWriteDeadline(time.Now().Add(readWriteHandler.GetPingTimeout())) if err = conn.WriteMessage(PingMessage, nil); nil != err { return } case <-stopChan: return } } }