This commit is contained in:
crusader 2018-04-04 14:47:10 +09:00
parent 67bc6d5d03
commit cb6cff91ac
13 changed files with 61 additions and 69 deletions

View File

@ -1,4 +1,4 @@
package internal package server
import ( import (
"compress/flate" "compress/flate"

View File

@ -1,4 +1,4 @@
package internal package server
import ( import (
"errors" "errors"

View File

@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style // Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package internal package server
import ( import (
"encoding/json" "encoding/json"

View File

@ -1,4 +1,4 @@
package internal package server
import "unsafe" import "unsafe"

View File

@ -1,4 +1,4 @@
package internal package server
import ( import (
"bytes" "bytes"

View File

@ -4,7 +4,7 @@
// +build go1.5 // +build go1.5
package internal package server
import "io" import "io"

View File

@ -1,4 +1,4 @@
package internal package server
import ( import (
"bufio" "bufio"

View File

@ -16,9 +16,8 @@ import (
"sync" "sync"
"time" "time"
logging "git.loafle.net/commons/logging-go" "git.loafle.net/commons/logging-go"
server "git.loafle.net/commons/server-go" "git.loafle.net/commons/server-go"
"git.loafle.net/commons/server-go/internal"
) )
var errMalformedURL = errors.New("malformed ws or wss URL") var errMalformedURL = errors.New("malformed ws or wss URL")
@ -82,14 +81,14 @@ type Client struct {
stopChan chan struct{} stopChan chan struct{}
stopWg sync.WaitGroup stopWg sync.WaitGroup
conn *internal.Conn conn *server.Conn
readChan chan []byte readChan chan []byte
writeChan chan []byte writeChan chan []byte
} }
func (c *Client) Connect() (readChan <-chan []byte, writeChan chan<- []byte, res *http.Response, err error) { func (c *Client) Connect() (readChan <-chan []byte, writeChan chan<- []byte, res *http.Response, err error) {
var ( var (
conn *internal.Conn conn *server.Conn
) )
if c.stopChan != nil { if c.stopChan != nil {
@ -132,7 +131,7 @@ func (c *Client) clientMessage(msg string) string {
return fmt.Sprintf("Client[%s]: %s", c.Name, msg) return fmt.Sprintf("Client[%s]: %s", c.Name, msg)
} }
func (c *Client) connect() (*internal.Conn, *http.Response, error) { func (c *Client) connect() (*server.Conn, *http.Response, error) {
conn, res, err := c.Dial() conn, res, err := c.Dial()
if nil != err { if nil != err {
return nil, nil, err return nil, nil, err
@ -145,7 +144,7 @@ func (c *Client) connect() (*internal.Conn, *http.Response, error) {
return conn, res, nil return conn, res, nil
} }
func (c *Client) handleConnection(conn *internal.Conn) { func (c *Client) handleConnection(conn *server.Conn) {
defer func() { defer func() {
if nil != conn { if nil != conn {
conn.Close() conn.Close()
@ -184,7 +183,7 @@ func (c *Client) handleConnection(conn *internal.Conn) {
} }
} }
func handleClientRead(c *Client, conn *internal.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}) { func handleClientRead(c *Client, conn *server.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}) {
defer func() { defer func() {
close(doneChan) close(doneChan)
}() }()
@ -207,7 +206,7 @@ func handleClientRead(c *Client, conn *internal.Conn, doneChan chan<- struct{},
go func() { go func() {
_, message, err = conn.ReadMessage() _, message, err = conn.ReadMessage()
if err != nil { if err != nil {
if internal.IsUnexpectedCloseError(err, internal.CloseGoingAway, internal.CloseAbnormalClosure) { if server.IsUnexpectedCloseError(err, server.CloseGoingAway, server.CloseAbnormalClosure) {
logging.Logger().Debugf(c.clientMessage(fmt.Sprintf("Read error %v", err))) logging.Logger().Debugf(c.clientMessage(fmt.Sprintf("Read error %v", err)))
} }
} }
@ -234,7 +233,7 @@ func handleClientRead(c *Client, conn *internal.Conn, doneChan chan<- struct{},
} }
} }
func handleClientWrite(c *Client, conn *internal.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}) { func handleClientWrite(c *Client, conn *server.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}) {
defer func() { defer func() {
close(doneChan) close(doneChan)
}() }()
@ -248,11 +247,11 @@ func handleClientWrite(c *Client, conn *internal.Conn, doneChan chan<- struct{},
case message, ok := <-c.writeChan: case message, ok := <-c.writeChan:
conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
if !ok { if !ok {
conn.WriteMessage(internal.CloseMessage, []byte{}) conn.WriteMessage(server.CloseMessage, []byte{})
return return
} }
w, err := conn.NextWriter(internal.TextMessage) w, err := conn.NextWriter(server.TextMessage)
if err != nil { if err != nil {
return return
} }
@ -263,7 +262,7 @@ func handleClientWrite(c *Client, conn *internal.Conn, doneChan chan<- struct{},
} }
case <-ticker.C: case <-ticker.C:
conn.SetWriteDeadline(time.Now().Add(c.PingTimeout)) conn.SetWriteDeadline(time.Now().Add(c.PingTimeout))
if err := conn.WriteMessage(internal.PingMessage, nil); nil != err { if err := conn.WriteMessage(server.PingMessage, nil); nil != err {
return return
} }
case <-c.stopChan: case <-c.stopChan:
@ -272,7 +271,7 @@ func handleClientWrite(c *Client, conn *internal.Conn, doneChan chan<- struct{},
} }
} }
func (c *Client) Dial() (*internal.Conn, *http.Response, error) { func (c *Client) Dial() (*server.Conn, *http.Response, error) {
var ( var (
err error err error
challengeKey string challengeKey string
@ -432,7 +431,7 @@ func (c *Client) Dial() (*internal.Conn, *http.Response, error) {
} }
} }
conn := internal.NewConn(netConn, false, c.ReadBufferSize, c.WriteBufferSize) conn := server.NewConn(netConn, false, c.ReadBufferSize, c.WriteBufferSize)
if err := req.Write(netConn); err != nil { if err := req.Write(netConn); err != nil {
return nil, nil, err return nil, nil, err
@ -459,7 +458,7 @@ func (c *Client) Dial() (*internal.Conn, *http.Response, error) {
buf := make([]byte, 1024) buf := make([]byte, 1024)
n, _ := io.ReadFull(resp.Body, buf) n, _ := io.ReadFull(resp.Body, buf)
resp.Body = ioutil.NopCloser(bytes.NewReader(buf[:n])) resp.Body = ioutil.NopCloser(bytes.NewReader(buf[:n]))
return nil, resp, internal.ErrBadHandshake return nil, resp, server.ErrBadHandshake
} }
for _, ext := range httpParseExtensions(resp.Header) { for _, ext := range httpParseExtensions(resp.Header) {
@ -469,10 +468,10 @@ func (c *Client) Dial() (*internal.Conn, *http.Response, error) {
_, snct := ext["server_no_context_takeover"] _, snct := ext["server_no_context_takeover"]
_, cnct := ext["client_no_context_takeover"] _, cnct := ext["client_no_context_takeover"]
if !snct || !cnct { if !snct || !cnct {
return nil, resp, internal.ErrInvalidCompression return nil, resp, server.ErrInvalidCompression
} }
conn.NewCompressionWriter = internal.CompressNoContextTakeover conn.NewCompressionWriter = server.CompressNoContextTakeover
conn.NewDecompressionReader = internal.DecompressNoContextTakeover conn.NewDecompressionReader = server.DecompressNoContextTakeover
break break
} }

View File

@ -10,7 +10,6 @@ import (
"git.loafle.net/commons/logging-go" "git.loafle.net/commons/logging-go"
"git.loafle.net/commons/server-go" "git.loafle.net/commons/server-go"
"git.loafle.net/commons/server-go/internal"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
) )
@ -184,7 +183,7 @@ func (s *Server) httpHandler(ctx *fasthttp.RequestCtx) {
return return
} }
s.upgrader.Upgrade(ctx, responseHeader, func(conn *internal.Conn, err error) { s.upgrader.Upgrade(ctx, responseHeader, func(conn *server.Conn, err error) {
if err != nil { if err != nil {
s.onError(ctx, fasthttp.StatusInternalServerError, err) s.onError(ctx, fasthttp.StatusInternalServerError, err)
return return
@ -195,7 +194,7 @@ func (s *Server) httpHandler(ctx *fasthttp.RequestCtx) {
}) })
} }
func (s *Server) handleConnection(servlet Servlet, servletCtx server.ServletCtx, conn *internal.Conn) { func (s *Server) handleConnection(servlet Servlet, servletCtx server.ServletCtx, conn *server.Conn) {
addr := conn.RemoteAddr() addr := conn.RemoteAddr()
defer func() { defer func() {
@ -260,7 +259,7 @@ func (s *Server) onError(ctx *fasthttp.RequestCtx, status int, reason error) {
s.ServerHandler.OnError(s.ctx, ctx, status, reason) s.ServerHandler.OnError(s.ctx, ctx, status, reason)
} }
func handleRead(s *Server, conn *internal.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}, readChan chan []byte) { func handleRead(s *Server, conn *server.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}, readChan chan []byte) {
defer func() { defer func() {
close(doneChan) close(doneChan)
}() }()
@ -283,7 +282,7 @@ func handleRead(s *Server, conn *internal.Conn, doneChan chan<- struct{}, stopCh
go func() { go func() {
_, message, err = conn.ReadMessage() _, message, err = conn.ReadMessage()
if err != nil { if err != nil {
if internal.IsUnexpectedCloseError(err, internal.CloseGoingAway, internal.CloseAbnormalClosure) { if server.IsUnexpectedCloseError(err, server.CloseGoingAway, server.CloseAbnormalClosure) {
logging.Logger().Debugf(s.serverMessage(fmt.Sprintf("Read error %v", err))) logging.Logger().Debugf(s.serverMessage(fmt.Sprintf("Read error %v", err)))
} }
} }
@ -310,7 +309,7 @@ func handleRead(s *Server, conn *internal.Conn, doneChan chan<- struct{}, stopCh
} }
} }
func handleWrite(s *Server, conn *internal.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}, writeChan chan []byte) { func handleWrite(s *Server, conn *server.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}, writeChan chan []byte) {
defer func() { defer func() {
close(doneChan) close(doneChan)
}() }()
@ -324,11 +323,11 @@ func handleWrite(s *Server, conn *internal.Conn, doneChan chan<- struct{}, stopC
case message, ok := <-writeChan: case message, ok := <-writeChan:
conn.SetWriteDeadline(time.Now().Add(s.ServerHandler.GetWriteTimeout())) conn.SetWriteDeadline(time.Now().Add(s.ServerHandler.GetWriteTimeout()))
if !ok { if !ok {
conn.WriteMessage(internal.CloseMessage, []byte{}) conn.WriteMessage(server.CloseMessage, []byte{})
return return
} }
w, err := conn.NextWriter(internal.TextMessage) w, err := conn.NextWriter(server.TextMessage)
if err != nil { if err != nil {
return return
} }
@ -339,7 +338,7 @@ func handleWrite(s *Server, conn *internal.Conn, doneChan chan<- struct{}, stopC
} }
case <-ticker.C: case <-ticker.C:
conn.SetWriteDeadline(time.Now().Add(s.ServerHandler.GetPingTimeout())) conn.SetWriteDeadline(time.Now().Add(s.ServerHandler.GetPingTimeout()))
if err := conn.WriteMessage(internal.PingMessage, nil); nil != err { if err := conn.WriteMessage(server.PingMessage, nil); nil != err {
return return
} }
case <-s.stopChan: case <-s.stopChan:

View File

@ -11,12 +11,12 @@ import (
"strings" "strings"
"time" "time"
"git.loafle.net/commons/server-go/internal" "git.loafle.net/commons/server-go"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
) )
type ( type (
OnUpgradeFunc func(*internal.Conn, error) OnUpgradeFunc func(*server.Conn, error)
) )
// HandshakeError describes an error with the handshake from the peer. // HandshakeError describes an error with the handshake from the peer.
@ -60,7 +60,7 @@ type Upgrader struct {
EnableCompression bool EnableCompression bool
} }
func (u *Upgrader) returnError(ctx *fasthttp.RequestCtx, status int, reason string) (*internal.Conn, error) { func (u *Upgrader) returnError(ctx *fasthttp.RequestCtx, status int, reason string) (*server.Conn, error) {
err := HandshakeError{reason} err := HandshakeError{reason}
if u.Error != nil { if u.Error != nil {
u.Error(ctx, status, err) u.Error(ctx, status, err)
@ -192,11 +192,11 @@ func (u *Upgrader) Upgrade(ctx *fasthttp.RequestCtx, responseHeader *fasthttp.Re
ctx.Request.Header.CopyTo(h) ctx.Request.Header.CopyTo(h)
ctx.Hijack(func(netConn net.Conn) { ctx.Hijack(func(netConn net.Conn) {
c := internal.NewConn(netConn, true, u.ReadBufferSize, u.WriteBufferSize) c := server.NewConn(netConn, true, u.ReadBufferSize, u.WriteBufferSize)
c.Subprotocol = subprotocol c.Subprotocol = subprotocol
if compress { if compress {
c.NewCompressionWriter = internal.CompressNoContextTakeover c.NewCompressionWriter = server.CompressNoContextTakeover
c.NewDecompressionReader = internal.DecompressNoContextTakeover c.NewDecompressionReader = server.DecompressNoContextTakeover
} }
// Clear deadlines set by HTTP server. // Clear deadlines set by HTTP server.

View File

@ -7,9 +7,8 @@ import (
"sync" "sync"
"time" "time"
logging "git.loafle.net/commons/logging-go" "git.loafle.net/commons/logging-go"
server "git.loafle.net/commons/server-go" "git.loafle.net/commons/server-go"
"git.loafle.net/commons/server-go/internal"
) )
type Client struct { type Client struct {
@ -56,14 +55,14 @@ type Client struct {
stopChan chan struct{} stopChan chan struct{}
stopWg sync.WaitGroup stopWg sync.WaitGroup
conn *internal.Conn conn *server.Conn
readChan chan []byte readChan chan []byte
writeChan chan []byte writeChan chan []byte
} }
func (c *Client) Connect() (readChan <-chan []byte, writeChan chan<- []byte, err error) { func (c *Client) Connect() (readChan <-chan []byte, writeChan chan<- []byte, err error) {
var ( var (
conn *internal.Conn conn *server.Conn
) )
if c.stopChan != nil { if c.stopChan != nil {
@ -106,13 +105,13 @@ func (c *Client) clientMessage(msg string) string {
return fmt.Sprintf("Client[%s]: %s", c.Name, msg) return fmt.Sprintf("Client[%s]: %s", c.Name, msg)
} }
func (c *Client) connect() (*internal.Conn, error) { func (c *Client) connect() (*server.Conn, error) {
netConn, err := c.Dial() netConn, err := c.Dial()
if nil != err { if nil != err {
return nil, err return nil, err
} }
conn := internal.NewConn(netConn, false, c.ReadBufferSize, c.WriteBufferSize) conn := server.NewConn(netConn, false, c.ReadBufferSize, c.WriteBufferSize)
conn.SetCloseHandler(func(code int, text string) error { conn.SetCloseHandler(func(code int, text string) error {
logging.Logger().Debugf("close") logging.Logger().Debugf("close")
return nil return nil
@ -120,7 +119,7 @@ func (c *Client) connect() (*internal.Conn, error) {
return conn, nil return conn, nil
} }
func (c *Client) handleConnection(conn *internal.Conn) { func (c *Client) handleConnection(conn *server.Conn) {
defer func() { defer func() {
if nil != conn { if nil != conn {
conn.Close() conn.Close()
@ -159,7 +158,7 @@ func (c *Client) handleConnection(conn *internal.Conn) {
} }
} }
func handleClientRead(c *Client, conn *internal.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}) { func handleClientRead(c *Client, conn *server.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}) {
defer func() { defer func() {
close(doneChan) close(doneChan)
}() }()
@ -182,7 +181,7 @@ func handleClientRead(c *Client, conn *internal.Conn, doneChan chan<- struct{},
go func() { go func() {
_, message, err = conn.ReadMessage() _, message, err = conn.ReadMessage()
if err != nil { if err != nil {
if internal.IsUnexpectedCloseError(err, internal.CloseGoingAway, internal.CloseAbnormalClosure) { if server.IsUnexpectedCloseError(err, server.CloseGoingAway, server.CloseAbnormalClosure) {
logging.Logger().Debugf(c.clientMessage(fmt.Sprintf("Read error %v", err))) logging.Logger().Debugf(c.clientMessage(fmt.Sprintf("Read error %v", err)))
} }
} }
@ -209,7 +208,7 @@ func handleClientRead(c *Client, conn *internal.Conn, doneChan chan<- struct{},
} }
} }
func handleClientWrite(c *Client, conn *internal.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}) { func handleClientWrite(c *Client, conn *server.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}) {
defer func() { defer func() {
close(doneChan) close(doneChan)
}() }()
@ -223,11 +222,11 @@ func handleClientWrite(c *Client, conn *internal.Conn, doneChan chan<- struct{},
case message, ok := <-c.writeChan: case message, ok := <-c.writeChan:
conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
if !ok { if !ok {
conn.WriteMessage(internal.CloseMessage, []byte{}) conn.WriteMessage(server.CloseMessage, []byte{})
return return
} }
w, err := conn.NextWriter(internal.TextMessage) w, err := conn.NextWriter(server.TextMessage)
if err != nil { if err != nil {
return return
} }
@ -238,7 +237,7 @@ func handleClientWrite(c *Client, conn *internal.Conn, doneChan chan<- struct{},
} }
case <-ticker.C: case <-ticker.C:
conn.SetWriteDeadline(time.Now().Add(c.PingTimeout)) conn.SetWriteDeadline(time.Now().Add(c.PingTimeout))
if err := conn.WriteMessage(internal.PingMessage, nil); nil != err { if err := conn.WriteMessage(server.PingMessage, nil); nil != err {
return return
} }
case <-c.stopChan: case <-c.stopChan:

View File

@ -10,7 +10,6 @@ import (
"git.loafle.net/commons/logging-go" "git.loafle.net/commons/logging-go"
"git.loafle.net/commons/server-go" "git.loafle.net/commons/server-go"
"git.loafle.net/commons/server-go/internal"
) )
type Server struct { type Server struct {
@ -165,14 +164,14 @@ func (s *Server) handleServer(listener net.Listener) error {
continue continue
} }
conn := internal.NewConn(netConn, true, s.ServerHandler.GetReadBufferSize(), s.ServerHandler.GetWriteBufferSize()) conn := server.NewConn(netConn, true, s.ServerHandler.GetReadBufferSize(), s.ServerHandler.GetWriteBufferSize())
s.stopWg.Add(1) s.stopWg.Add(1)
go s.handleConnection(servlet, servletCtx, conn) go s.handleConnection(servlet, servletCtx, conn)
} }
} }
func (s *Server) handleConnection(servlet Servlet, servletCtx server.ServletCtx, conn *internal.Conn) { func (s *Server) handleConnection(servlet Servlet, servletCtx server.ServletCtx, conn *server.Conn) {
addr := conn.RemoteAddr() addr := conn.RemoteAddr()
defer func() { defer func() {
@ -232,7 +231,7 @@ func (s *Server) handleConnection(servlet Servlet, servletCtx server.ServletCtx,
} }
} }
func handleRead(s *Server, conn *internal.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}, readChan chan []byte) { func handleRead(s *Server, conn *server.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}, readChan chan []byte) {
defer func() { defer func() {
close(doneChan) close(doneChan)
}() }()
@ -255,7 +254,7 @@ func handleRead(s *Server, conn *internal.Conn, doneChan chan<- struct{}, stopCh
go func() { go func() {
_, message, err = conn.ReadMessage() _, message, err = conn.ReadMessage()
if err != nil { if err != nil {
if internal.IsUnexpectedCloseError(err, internal.CloseGoingAway, internal.CloseAbnormalClosure) { if server.IsUnexpectedCloseError(err, server.CloseGoingAway, server.CloseAbnormalClosure) {
logging.Logger().Debugf(s.serverMessage(fmt.Sprintf("Read error %v", err))) logging.Logger().Debugf(s.serverMessage(fmt.Sprintf("Read error %v", err)))
} }
} }
@ -282,7 +281,7 @@ func handleRead(s *Server, conn *internal.Conn, doneChan chan<- struct{}, stopCh
} }
} }
func handleWrite(s *Server, conn *internal.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}, writeChan chan []byte) { func handleWrite(s *Server, conn *server.Conn, doneChan chan<- struct{}, stopChan <-chan struct{}, writeChan chan []byte) {
defer func() { defer func() {
close(doneChan) close(doneChan)
}() }()
@ -296,11 +295,11 @@ func handleWrite(s *Server, conn *internal.Conn, doneChan chan<- struct{}, stopC
case message, ok := <-writeChan: case message, ok := <-writeChan:
conn.SetWriteDeadline(time.Now().Add(s.ServerHandler.GetWriteTimeout())) conn.SetWriteDeadline(time.Now().Add(s.ServerHandler.GetWriteTimeout()))
if !ok { if !ok {
conn.WriteMessage(internal.CloseMessage, []byte{}) conn.WriteMessage(server.CloseMessage, []byte{})
return return
} }
w, err := conn.NextWriter(internal.TextMessage) w, err := conn.NextWriter(server.TextMessage)
if err != nil { if err != nil {
return return
} }
@ -311,7 +310,7 @@ func handleWrite(s *Server, conn *internal.Conn, doneChan chan<- struct{}, stopC
} }
case <-ticker.C: case <-ticker.C:
conn.SetWriteDeadline(time.Now().Add(s.ServerHandler.GetPingTimeout())) conn.SetWriteDeadline(time.Now().Add(s.ServerHandler.GetPingTimeout()))
if err := conn.WriteMessage(internal.PingMessage, nil); nil != err { if err := conn.WriteMessage(server.PingMessage, nil); nil != err {
return return
} }
case <-s.stopChan: case <-s.stopChan:

View File

@ -1,16 +1,12 @@
package server package server
import (
"git.loafle.net/commons/server-go/internal"
)
type Servlet interface { type Servlet interface {
ServletCtx(serverCtx ServerCtx) ServletCtx ServletCtx(serverCtx ServerCtx) ServletCtx
Init(serverCtx ServerCtx) error Init(serverCtx ServerCtx) error
Destroy(serverCtx ServerCtx) Destroy(serverCtx ServerCtx)
OnConnect(servletCtx ServletCtx, conn *internal.Conn) OnConnect(servletCtx ServletCtx, conn *Conn)
Handle(servletCtx ServletCtx, stopChan <-chan struct{}, doneChan chan<- struct{}, readChan <-chan []byte, writeChan chan<- []byte) Handle(servletCtx ServletCtx, stopChan <-chan struct{}, doneChan chan<- struct{}, readChan <-chan []byte, writeChan chan<- []byte)
OnDisconnect(servletCtx ServletCtx) OnDisconnect(servletCtx ServletCtx)
} }