project initialized
This commit is contained in:
219
socket/net/client/connector.go
Normal file
219
socket/net/client/connector.go
Normal file
@@ -0,0 +1,219 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
olog "git.loafle.net/overflow/log-go"
|
||||
"git.loafle.net/overflow/server-go/socket"
|
||||
"git.loafle.net/overflow/server-go/socket/client"
|
||||
)
|
||||
|
||||
type Connectors struct {
|
||||
client.Connectors
|
||||
|
||||
Network string `json:"network,omitempty"`
|
||||
Address string `json:"address,omitempty"`
|
||||
LocalAddress net.Addr `json:"-"`
|
||||
|
||||
stopChan chan struct{}
|
||||
stopWg sync.WaitGroup
|
||||
|
||||
readChan chan socket.SocketMessage
|
||||
writeChan chan socket.SocketMessage
|
||||
|
||||
disconnectedChan chan struct{}
|
||||
reconnectedChan chan socket.Conn
|
||||
|
||||
crw socket.ClientReadWriter
|
||||
|
||||
validated atomic.Value
|
||||
}
|
||||
|
||||
func (c *Connectors) Connect() (readChan <-chan socket.SocketMessage, writeChan chan<- socket.SocketMessage, err error) {
|
||||
var (
|
||||
conn socket.Conn
|
||||
)
|
||||
|
||||
if nil != c.stopChan {
|
||||
return nil, nil, fmt.Errorf("%s already connected", c.logHeader())
|
||||
}
|
||||
|
||||
conn, err = c.connect()
|
||||
if nil != err {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
c.readChan = make(chan socket.SocketMessage, 256)
|
||||
c.writeChan = make(chan socket.SocketMessage, 256)
|
||||
c.disconnectedChan = make(chan struct{})
|
||||
c.reconnectedChan = make(chan socket.Conn)
|
||||
c.stopChan = make(chan struct{})
|
||||
|
||||
c.crw.ReadwriteHandler = c
|
||||
c.crw.ReadChan = c.readChan
|
||||
c.crw.WriteChan = c.writeChan
|
||||
c.crw.ClientStopChan = c.stopChan
|
||||
c.crw.ClientStopWg = &c.stopWg
|
||||
c.crw.DisconnectedChan = c.disconnectedChan
|
||||
c.crw.ReconnectedChan = c.reconnectedChan
|
||||
|
||||
c.stopWg.Add(1)
|
||||
go c.handleReconnect()
|
||||
c.stopWg.Add(1)
|
||||
go c.crw.HandleConnection(conn)
|
||||
|
||||
return c.readChan, c.writeChan, nil
|
||||
}
|
||||
|
||||
func (c *Connectors) Disconnect() error {
|
||||
if c.stopChan == nil {
|
||||
return fmt.Errorf("%s must be connected before disconnection it", c.logHeader())
|
||||
}
|
||||
close(c.stopChan)
|
||||
c.stopWg.Wait()
|
||||
|
||||
c.stopChan = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Connectors) logHeader() string {
|
||||
return fmt.Sprintf("Connector[%s]: ", c.Name)
|
||||
}
|
||||
|
||||
func (c *Connectors) onDisconnected() {
|
||||
close(c.readChan)
|
||||
close(c.writeChan)
|
||||
|
||||
c.reconnectedChan <- nil
|
||||
|
||||
onDisconnected := c.OnDisconnected
|
||||
if nil != onDisconnected {
|
||||
go func() {
|
||||
onDisconnected(c)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connectors) handleReconnect() {
|
||||
defer func() {
|
||||
c.stopWg.Done()
|
||||
}()
|
||||
|
||||
RC_LOOP:
|
||||
for {
|
||||
select {
|
||||
case <-c.disconnectedChan:
|
||||
case <-c.stopChan:
|
||||
return
|
||||
}
|
||||
|
||||
if 0 >= c.GetReconnectTryTime() {
|
||||
c.onDisconnected()
|
||||
return
|
||||
}
|
||||
|
||||
olog.Logger().Debugf("%s connection lost", c.logHeader())
|
||||
|
||||
for indexI := 0; indexI < c.GetReconnectTryTime(); indexI++ {
|
||||
olog.Logger().Debugf("%s trying reconnect[%d]", c.logHeader(), indexI)
|
||||
|
||||
conn, err := c.connect()
|
||||
if nil == err {
|
||||
olog.Logger().Debugf("reconnected")
|
||||
c.reconnectedChan <- conn
|
||||
continue RC_LOOP
|
||||
}
|
||||
time.Sleep(c.GetReconnectInterval())
|
||||
}
|
||||
|
||||
olog.Logger().Debugf("%s reconnecting has been failed", c.logHeader())
|
||||
c.onDisconnected()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connectors) connect() (socket.Conn, error) {
|
||||
netConn, err := c.dial()
|
||||
if nil != err {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn := socket.NewConn(netConn, false, c.GetReadBufferSize(), c.GetWriteBufferSize())
|
||||
conn.SetCloseHandler(func(code int, text string) error {
|
||||
olog.Logger().Debugf("%s close", c.logHeader())
|
||||
return nil
|
||||
})
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (c *Connectors) dial() (net.Conn, error) {
|
||||
var deadline time.Time
|
||||
if 0 != c.GetHandshakeTimeout() {
|
||||
deadline = time.Now().Add(c.GetHandshakeTimeout())
|
||||
}
|
||||
|
||||
d := &net.Dialer{
|
||||
KeepAlive: c.GetKeepAlive(),
|
||||
Deadline: deadline,
|
||||
LocalAddr: c.LocalAddress,
|
||||
}
|
||||
|
||||
conn, err := d.Dial(c.Network, c.Address)
|
||||
if nil != err {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if nil != c.GetTLSConfig() {
|
||||
cfg := c.GetTLSConfig().Clone()
|
||||
tlsConn := tls.Client(conn, cfg)
|
||||
if err := tlsConn.Handshake(); err != nil {
|
||||
tlsConn.Close()
|
||||
return nil, err
|
||||
}
|
||||
if !cfg.InsecureSkipVerify {
|
||||
if err := tlsConn.VerifyHostname(cfg.ServerName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
conn = tlsConn
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (c *Connectors) Clone() client.Connector {
|
||||
return &Connectors{
|
||||
Connectors: *c.Connectors.Clone(),
|
||||
Network: c.Network,
|
||||
Address: c.Address,
|
||||
LocalAddress: c.LocalAddress,
|
||||
validated: c.validated,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connectors) Validate() error {
|
||||
if nil != c.validated.Load() {
|
||||
return nil
|
||||
}
|
||||
c.validated.Store(true)
|
||||
|
||||
if err := c.Connectors.Validate(); nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
if "" == c.Network {
|
||||
return fmt.Errorf("%s Network is not valid", c.logHeader())
|
||||
}
|
||||
|
||||
if "" == c.Address {
|
||||
return fmt.Errorf("%s Address is not valid", c.logHeader())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
94
socket/net/server-handler.go
Normal file
94
socket/net/server-handler.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package net
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync/atomic"
|
||||
|
||||
"git.loafle.net/overflow/server-go"
|
||||
"git.loafle.net/overflow/server-go/socket"
|
||||
)
|
||||
|
||||
type ServerHandler interface {
|
||||
socket.ServerHandler
|
||||
|
||||
OnError(serverCtx server.ServerCtx, conn net.Conn, status int, reason error)
|
||||
|
||||
RegisterServlet(servlet Servlet)
|
||||
Servlet(serverCtx server.ServerCtx, conn net.Conn) Servlet
|
||||
}
|
||||
|
||||
type ServerHandlers struct {
|
||||
socket.ServerHandlers
|
||||
|
||||
servlet Servlet
|
||||
|
||||
validated atomic.Value
|
||||
}
|
||||
|
||||
func (sh *ServerHandlers) Init(serverCtx server.ServerCtx) error {
|
||||
if err := sh.ServerHandlers.Init(serverCtx); nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
if nil != sh.servlet {
|
||||
if err := sh.servlet.Init(serverCtx); nil != err {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sh *ServerHandlers) OnStart(serverCtx server.ServerCtx) error {
|
||||
if err := sh.ServerHandlers.OnStart(serverCtx); nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
if nil != sh.servlet {
|
||||
if err := sh.servlet.OnStart(serverCtx); nil != err {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sh *ServerHandlers) OnStop(serverCtx server.ServerCtx) {
|
||||
if nil != sh.servlet {
|
||||
sh.servlet.OnStop(serverCtx)
|
||||
}
|
||||
|
||||
sh.ServerHandlers.OnStop(serverCtx)
|
||||
}
|
||||
|
||||
func (sh *ServerHandlers) Destroy(serverCtx server.ServerCtx) {
|
||||
if nil != sh.servlet {
|
||||
sh.servlet.Destroy(serverCtx)
|
||||
}
|
||||
|
||||
sh.ServerHandlers.Destroy(serverCtx)
|
||||
}
|
||||
|
||||
func (sh *ServerHandlers) OnError(serverCtx server.ServerCtx, conn net.Conn, status int, reason error) {
|
||||
}
|
||||
|
||||
func (sh *ServerHandlers) RegisterServlet(servlet Servlet) {
|
||||
sh.servlet = servlet
|
||||
}
|
||||
|
||||
func (sh *ServerHandlers) Servlet(serverCtx server.ServerCtx, conn net.Conn) Servlet {
|
||||
return sh.servlet
|
||||
}
|
||||
|
||||
func (sh *ServerHandlers) Validate() error {
|
||||
if nil != sh.validated.Load() {
|
||||
return nil
|
||||
}
|
||||
sh.validated.Store(true)
|
||||
|
||||
if err := sh.ServerHandlers.Validate(); nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
167
socket/net/server.go
Normal file
167
socket/net/server.go
Normal file
@@ -0,0 +1,167 @@
|
||||
package net
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
olog "git.loafle.net/overflow/log-go"
|
||||
"git.loafle.net/overflow/server-go"
|
||||
"git.loafle.net/overflow/server-go/socket"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
ServerHandler ServerHandler
|
||||
|
||||
ctx server.ServerCtx
|
||||
stopChan chan struct{}
|
||||
stopWg sync.WaitGroup
|
||||
|
||||
srw socket.ServerReadWriter
|
||||
}
|
||||
|
||||
func (s *Server) ListenAndServe() error {
|
||||
if s.stopChan != nil {
|
||||
return fmt.Errorf("%s already running. Stop it before starting it again", s.logHeader())
|
||||
}
|
||||
|
||||
var (
|
||||
err error
|
||||
listener net.Listener
|
||||
)
|
||||
if nil == s.ServerHandler {
|
||||
return fmt.Errorf("%s server handler must be specified", s.logHeader())
|
||||
}
|
||||
if err = s.ServerHandler.Validate(); nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
s.ctx = s.ServerHandler.ServerCtx()
|
||||
if nil == s.ctx {
|
||||
return fmt.Errorf("%s ServerCtx is nil", s.logHeader())
|
||||
}
|
||||
|
||||
if err = s.ServerHandler.Init(s.ctx); nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
if listener, err = s.ServerHandler.Listener(s.ctx); nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
s.stopChan = make(chan struct{})
|
||||
|
||||
s.srw.ReadwriteHandler = s.ServerHandler
|
||||
s.srw.ServerStopChan = s.stopChan
|
||||
s.srw.ServerStopWg = &s.stopWg
|
||||
|
||||
s.stopWg.Add(1)
|
||||
return s.handleServer(listener)
|
||||
}
|
||||
|
||||
func (s *Server) Shutdown(ctx context.Context) error {
|
||||
if s.stopChan == nil {
|
||||
return fmt.Errorf("%s must be started before stopping it", s.logHeader())
|
||||
}
|
||||
close(s.stopChan)
|
||||
s.stopWg.Wait()
|
||||
|
||||
s.ServerHandler.Destroy(s.ctx)
|
||||
|
||||
s.stopChan = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) logHeader() string {
|
||||
return fmt.Sprintf("Server[%s]:", s.ServerHandler.GetName())
|
||||
}
|
||||
|
||||
func (s *Server) handleServer(listener net.Listener) error {
|
||||
var (
|
||||
stopping atomic.Value
|
||||
netConn net.Conn
|
||||
err error
|
||||
)
|
||||
|
||||
defer func() {
|
||||
if nil != listener {
|
||||
listener.Close()
|
||||
}
|
||||
s.ServerHandler.OnStop(s.ctx)
|
||||
olog.Logger().Infof("%s Stopped", s.logHeader())
|
||||
s.stopWg.Done()
|
||||
}()
|
||||
|
||||
if err = s.ServerHandler.OnStart(s.ctx); nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
olog.Logger().Infof("%s Started", s.logHeader())
|
||||
|
||||
for {
|
||||
acceptChan := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
if netConn, err = listener.Accept(); err != nil {
|
||||
if nil == stopping.Load() {
|
||||
olog.Logger().Errorf("%s %v", s.logHeader(), err)
|
||||
}
|
||||
}
|
||||
close(acceptChan)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-s.stopChan:
|
||||
stopping.Store(true)
|
||||
listener.Close()
|
||||
<-acceptChan
|
||||
listener = nil
|
||||
return nil
|
||||
case <-acceptChan:
|
||||
}
|
||||
|
||||
if nil != err {
|
||||
select {
|
||||
case <-s.stopChan:
|
||||
return nil
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if 0 < s.ServerHandler.GetConcurrency() {
|
||||
sz := s.srw.ConnectionSize()
|
||||
if sz >= s.ServerHandler.GetConcurrency() {
|
||||
olog.Logger().Warnf("%s max connections size %d, refuse", s.logHeader(), sz)
|
||||
netConn.Close()
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
servlet := s.ServerHandler.(ServerHandler).Servlet(s.ctx, netConn)
|
||||
if nil == servlet {
|
||||
olog.Logger().Errorf("%s Servlet is nil", s.logHeader())
|
||||
continue
|
||||
}
|
||||
|
||||
servletCtx := servlet.ServletCtx(s.ctx)
|
||||
if nil == servletCtx {
|
||||
olog.Logger().Errorf("%s ServletCtx is nil", s.logHeader())
|
||||
continue
|
||||
}
|
||||
|
||||
if err := servlet.Handshake(servletCtx, netConn); nil != err {
|
||||
olog.Logger().Infof("%s Handshaking of Client[%s] has been failed %v", s.logHeader(), netConn.RemoteAddr(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
conn := socket.NewConn(netConn, true, s.ServerHandler.GetReadBufferSize(), s.ServerHandler.GetWriteBufferSize())
|
||||
|
||||
s.stopWg.Add(1)
|
||||
go s.srw.HandleConnection(servlet, servletCtx, conn)
|
||||
}
|
||||
}
|
||||
54
socket/net/servlet.go
Normal file
54
socket/net/servlet.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package net
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"git.loafle.net/overflow/server-go"
|
||||
"git.loafle.net/overflow/server-go/socket"
|
||||
)
|
||||
|
||||
type Servlet interface {
|
||||
socket.Servlet
|
||||
|
||||
Handshake(servletCtx server.ServletCtx, conn net.Conn) error
|
||||
}
|
||||
|
||||
type Servlets struct {
|
||||
Servlet
|
||||
}
|
||||
|
||||
func (s *Servlets) ServletCtx(serverCtx server.ServerCtx) server.ServletCtx {
|
||||
return server.NewServletContext(nil, serverCtx)
|
||||
}
|
||||
|
||||
func (s *Servlets) Init(serverCtx server.ServerCtx) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Servlets) OnStart(serverCtx server.ServerCtx) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Servlets) OnStop(serverCtx server.ServerCtx) {
|
||||
//
|
||||
}
|
||||
|
||||
func (s *Servlets) Destroy(serverCtx server.ServerCtx) {
|
||||
//
|
||||
}
|
||||
|
||||
func (s *Servlets) Handshake(servletCtx server.ServletCtx, conn net.Conn) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Servlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) {
|
||||
//
|
||||
}
|
||||
|
||||
func (s *Servlets) Handle(servletCtx server.ServletCtx, stopChan <-chan struct{}, doneChan chan<- struct{}, readChan <-chan socket.SocketMessage, writeChan chan<- socket.SocketMessage) {
|
||||
|
||||
}
|
||||
|
||||
func (s *Servlets) OnDisconnect(servletCtx server.ServletCtx) {
|
||||
//
|
||||
}
|
||||
Reference in New Issue
Block a user