This commit is contained in:
crusader 2018-06-29 19:02:40 +09:00
parent e34e7030c2
commit d1d485c711
12 changed files with 42 additions and 30 deletions

View File

@ -9,8 +9,8 @@ import (
type ClientReadWriter struct { type ClientReadWriter struct {
ReadwriteHandler ReadWriteHandler ReadwriteHandler ReadWriteHandler
ReadChan chan<- []byte ReadChan chan<- SocketMessage
WriteChan <-chan []byte WriteChan <-chan SocketMessage
DisconnectedChan chan<- struct{} DisconnectedChan chan<- struct{}
ReconnectedChan <-chan Conn ReconnectedChan <-chan Conn
ClientStopChan <-chan struct{} ClientStopChan <-chan struct{}

View File

@ -2,12 +2,14 @@ package client
import ( import (
"sync/atomic" "sync/atomic"
"git.loafle.net/commons/server-go/socket"
) )
type OnDisconnectedFunc func(connector Connector) type OnDisconnectedFunc func(connector Connector)
type Connector interface { type Connector interface {
Connect() (readChan <-chan []byte, writeChan chan<- []byte, err error) Connect() (readChan <-chan socket.SocketMessage, writeChan chan<- socket.SocketMessage, err error)
Disconnect() error Disconnect() error
GetName() string GetName() string

View File

@ -144,7 +144,6 @@ type Conn interface {
WriteControl(messageType int, data []byte, deadline time.Time) error WriteControl(messageType int, data []byte, deadline time.Time) error
WriteJSON(v interface{}) error WriteJSON(v interface{}) error
WriteMessage(messageType int, data []byte) error WriteMessage(messageType int, data []byte) error
WriteCompress(messageType int, data []byte) error
WritePreparedMessage(pm *PreparedMessage) error WritePreparedMessage(pm *PreparedMessage) error
} }

View File

@ -9,8 +9,8 @@ import (
"time" "time"
"git.loafle.net/commons/logging-go" "git.loafle.net/commons/logging-go"
"git.loafle.net/commons/server-go/client"
"git.loafle.net/commons/server-go/socket" "git.loafle.net/commons/server-go/socket"
"git.loafle.net/commons/server-go/socket/client"
) )
type Connectors struct { type Connectors struct {
@ -25,8 +25,8 @@ type Connectors struct {
stopChan chan struct{} stopChan chan struct{}
stopWg sync.WaitGroup stopWg sync.WaitGroup
readChan chan []byte readChan chan socket.SocketMessage
writeChan chan []byte writeChan chan socket.SocketMessage
disconnectedChan chan struct{} disconnectedChan chan struct{}
reconnectedChan chan socket.Conn reconnectedChan chan socket.Conn
@ -36,7 +36,7 @@ type Connectors struct {
validated atomic.Value validated atomic.Value
} }
func (c *Connectors) Connect() (readChan <-chan []byte, writeChan chan<- []byte, err error) { func (c *Connectors) Connect() (readChan <-chan socket.SocketMessage, writeChan chan<- socket.SocketMessage, err error) {
var ( var (
conn socket.Conn conn socket.Conn
) )
@ -50,8 +50,8 @@ func (c *Connectors) Connect() (readChan <-chan []byte, writeChan chan<- []byte,
return nil, nil, err return nil, nil, err
} }
c.readChan = make(chan []byte, 256) c.readChan = make(chan socket.SocketMessage, 256)
c.writeChan = make(chan []byte, 256) c.writeChan = make(chan socket.SocketMessage, 256)
c.disconnectedChan = make(chan struct{}) c.disconnectedChan = make(chan struct{})
c.reconnectedChan = make(chan socket.Conn) c.reconnectedChan = make(chan socket.Conn)
c.stopChan = make(chan struct{}) c.stopChan = make(chan struct{})

View File

@ -45,7 +45,7 @@ func (s *Servlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) {
// //
} }
func (s *Servlets) Handle(servletCtx server.ServletCtx, stopChan <-chan struct{}, doneChan chan<- struct{}, readChan <-chan []byte, writeChan chan<- []byte) { func (s *Servlets) Handle(servletCtx server.ServletCtx, stopChan <-chan struct{}, doneChan chan<- struct{}, readChan <-chan socket.SocketMessage, writeChan chan<- socket.SocketMessage) {
} }

View File

@ -7,7 +7,13 @@ import (
logging "git.loafle.net/commons/logging-go" logging "git.loafle.net/commons/logging-go"
) )
func connReadHandler(readWriteHandler ReadWriteHandler, conn Conn, stopChan <-chan struct{}, doneChan chan<- error, readChan chan<- []byte) { type SocketMessage func() (int, []byte)
func MakeSocketMessage(messageType int, message []byte) SocketMessage {
return func() (int, []byte) { return messageType, message }
}
func connReadHandler(readWriteHandler ReadWriteHandler, conn Conn, stopChan <-chan struct{}, doneChan chan<- error, readChan chan<- SocketMessage) {
var ( var (
err error err error
) )
@ -36,10 +42,11 @@ func connReadHandler(readWriteHandler ReadWriteHandler, conn Conn, stopChan <-ch
for { for {
var message []byte var message []byte
var messageType int
readMessageChan := make(chan struct{}) readMessageChan := make(chan struct{})
go func() { go func() {
_, message, err = conn.ReadMessage() messageType, message, err = conn.ReadMessage()
close(readMessageChan) close(readMessageChan)
}() }()
@ -57,13 +64,15 @@ func connReadHandler(readWriteHandler ReadWriteHandler, conn Conn, stopChan <-ch
return return
} }
readChan <- message readChan <- MakeSocketMessage(messageType, message)
} }
} }
func connWriteHandler(readWriteHandler ReadWriteHandler, conn Conn, stopChan <-chan struct{}, doneChan chan<- error, writeChan <-chan []byte) { func connWriteHandler(readWriteHandler ReadWriteHandler, conn Conn, stopChan <-chan struct{}, doneChan chan<- error, writeChan <-chan SocketMessage) {
var ( var (
socketMessage SocketMessage
message []byte message []byte
messageType int
ok bool ok bool
err error err error
) )
@ -78,7 +87,7 @@ func connWriteHandler(readWriteHandler ReadWriteHandler, conn Conn, stopChan <-c
}() }()
for { for {
select { select {
case message, ok = <-writeChan: case socketMessage, ok = <-writeChan:
if 0 < readWriteHandler.GetWriteTimeout() { if 0 < readWriteHandler.GetWriteTimeout() {
conn.SetWriteDeadline(time.Now().Add(readWriteHandler.GetWriteTimeout())) conn.SetWriteDeadline(time.Now().Add(readWriteHandler.GetWriteTimeout()))
} else { } else {
@ -89,7 +98,9 @@ func connWriteHandler(readWriteHandler ReadWriteHandler, conn Conn, stopChan <-c
return return
} }
err = conn.WriteCompress(TextMessage, message) messageType, message = socketMessage()
err = conn.WriteMessage(messageType, message)
if err != nil { if err != nil {
logging.Logger().Debug(err) logging.Logger().Debug(err)
return return

View File

@ -50,8 +50,8 @@ func (srw *ServerReadWriter) HandleConnection(servlet Servlet, servletCtx server
stopChan := make(chan struct{}) stopChan := make(chan struct{})
servletDoneChan := make(chan struct{}) servletDoneChan := make(chan struct{})
readChan := make(chan []byte) readChan := make(chan SocketMessage)
writeChan := make(chan []byte) writeChan := make(chan SocketMessage)
readerDoneChan := make(chan error) readerDoneChan := make(chan error)
writerDoneChan := make(chan error) writerDoneChan := make(chan error)

View File

@ -8,6 +8,6 @@ type Servlet interface {
server.Servlet server.Servlet
OnConnect(servletCtx server.ServletCtx, conn Conn) OnConnect(servletCtx server.ServletCtx, conn Conn)
Handle(servletCtx server.ServletCtx, stopChan <-chan struct{}, doneChan chan<- struct{}, readChan <-chan []byte, writeChan chan<- []byte) Handle(servletCtx server.ServletCtx, stopChan <-chan struct{}, doneChan chan<- struct{}, readChan <-chan SocketMessage, writeChan chan<- SocketMessage)
OnDisconnect(servletCtx server.ServletCtx) OnDisconnect(servletCtx server.ServletCtx)
} }

View File

@ -18,8 +18,8 @@ import (
"time" "time"
"git.loafle.net/commons/logging-go" "git.loafle.net/commons/logging-go"
"git.loafle.net/commons/server-go/client"
"git.loafle.net/commons/server-go/socket" "git.loafle.net/commons/server-go/socket"
"git.loafle.net/commons/server-go/socket/client"
"git.loafle.net/commons/server-go/socket/web" "git.loafle.net/commons/server-go/socket/web"
) )
@ -57,8 +57,8 @@ type Connectors struct {
stopChan chan struct{} stopChan chan struct{}
stopWg sync.WaitGroup stopWg sync.WaitGroup
readChan chan []byte readChan chan socket.SocketMessage
writeChan chan []byte writeChan chan socket.SocketMessage
disconnectedChan chan struct{} disconnectedChan chan struct{}
reconnectedChan chan socket.Conn reconnectedChan chan socket.Conn
@ -68,7 +68,7 @@ type Connectors struct {
validated atomic.Value validated atomic.Value
} }
func (c *Connectors) Connect() (readChan <-chan []byte, writeChan chan<- []byte, err error) { func (c *Connectors) Connect() (readChan <-chan socket.SocketMessage, writeChan chan<- socket.SocketMessage, err error) {
var ( var (
conn socket.Conn conn socket.Conn
res *http.Response res *http.Response
@ -87,8 +87,8 @@ func (c *Connectors) Connect() (readChan <-chan []byte, writeChan chan<- []byte,
resH(res) resH(res)
} }
c.readChan = make(chan []byte, 256) c.readChan = make(chan socket.SocketMessage, 256)
c.writeChan = make(chan []byte, 256) c.writeChan = make(chan socket.SocketMessage, 256)
c.disconnectedChan = make(chan struct{}) c.disconnectedChan = make(chan struct{})
c.reconnectedChan = make(chan socket.Conn) c.reconnectedChan = make(chan socket.Conn)
c.stopChan = make(chan struct{}) c.stopChan = make(chan struct{})

View File

@ -44,7 +44,7 @@ func (s *Servlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) {
// //
} }
func (s *Servlets) Handle(servletCtx server.ServletCtx, stopChan <-chan struct{}, doneChan chan<- struct{}, readChan <-chan []byte, writeChan chan<- []byte) { func (s *Servlets) Handle(servletCtx server.ServletCtx, stopChan <-chan struct{}, doneChan chan<- struct{}, readChan <-chan socket.SocketMessage, writeChan chan<- socket.SocketMessage) {
} }