server-go/server.go

167 lines
3.0 KiB
Go
Raw Normal View History

2018-04-03 08:55:48 +00:00
package server
import (
"context"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"git.loafle.net/commons/logging-go"
)
type Server struct {
ServerHandler ServerHandler
ctx ServerContext
servlets sync.Map
stopChan chan struct{}
stopWg sync.WaitGroup
}
func (s *Server) ListenAndServe() error {
if s.stopChan != nil {
return fmt.Errorf("Server: server is already running. Stop it before starting it again")
}
var (
err error
listener net.Listener
)
if nil == s.ServerHandler {
panic("Server: server handler must be specified.")
}
s.ServerHandler.Validate()
s.ctx = s.ServerHandler.ServerContext()
if nil == s.ctx {
return fmt.Errorf("Server: ServerContext is nil")
}
if err = s.ServerHandler.Init(s.ctx); nil != err {
return fmt.Errorf("Server: Initialization of server has been failed %v", err)
}
if listener, err = s.ServerHandler.Listen(s.ctx); nil != err {
return err
}
s.stopChan = make(chan struct{})
s.stopWg.Add(1)
return s.handleLoop(listener)
}
func (s *Server) Shutdown(ctx context.Context) error {
if s.stopChan == nil {
return fmt.Errorf("Server: server must be started before stopping it")
}
close(s.stopChan)
s.stopWg.Wait()
s.ServerHandler.Destroy(s.ctx)
s.stopChan = nil
return nil
}
func (s *Server) ConnectionSize() int {
var sz int
s.servlets.Range(func(k, v interface{}) bool {
sz++
return true
})
return sz
}
func (s *Server) handleLoop(listener net.Listener) error {
var (
stopping atomic.Value
conn net.Conn
err error
)
defer func() {
s.ServerHandler.OnStop(s.ctx)
s.stopWg.Done()
}()
if err = s.ServerHandler.OnStart(s.ctx); nil != err {
return err
}
for {
acceptChan := make(chan struct{})
go func() {
if conn, err = listener.Accept(); err != nil {
if nil == stopping.Load() {
logging.Logger().Errorf("Server: Cannot accept new connection: [%s]", err)
}
}
close(acceptChan)
}()
select {
case <-s.stopChan:
stopping.Store(true)
listener.Close()
<-acceptChan
return nil
case <-acceptChan:
}
if nil != err {
select {
case <-s.stopChan:
return nil
case <-time.After(time.Second):
}
continue
}
if 0 < s.ServerHandler.GetMaxConnections() {
sz := s.ConnectionSize()
if sz >= s.ServerHandler.GetMaxConnections() {
logging.Logger().Warnf("max connections size %d, refuse\n", sz)
conn.Close()
continue
}
}
s.stopWg.Add(1)
go s.handleConnection(conn)
}
}
func (s *Server) handleConnection(conn net.Conn) {
servlet := s.ServerHandler.Servlet()
defer func() {
s.servlets.Delete(servlet)
s.stopWg.Done()
}()
if nil == servlet {
logging.Logger().Errorf("Server: Servlet is nil")
}
s.servlets.Store(servlet, true)
servletStopChan := make(chan struct{})
doneChan := make(chan struct{})
go servlet.Handle(s.ctx, conn, doneChan, servletStopChan)
select {
case <-doneChan:
close(servletStopChan)
conn.Close()
case <-s.stopChan:
close(servletStopChan)
conn.Close()
<-doneChan
}
}