diff --git a/server_context.go b/server_context.go new file mode 100644 index 0000000..8308fdd --- /dev/null +++ b/server_context.go @@ -0,0 +1,20 @@ +package server + +import ( + cuc "git.loafle.net/commons_go/util/context" +) + +type ServerContext interface { + cuc.Context +} + +type serverContext struct { + cuc.Context +} + +func newServerContext() ServerContext { + sCTX := &serverContext{} + sCTX.Context = cuc.NewContext(nil) + + return sCTX +} diff --git a/server_handler.go b/server_handler.go index 3ed7127..3c37272 100644 --- a/server_handler.go +++ b/server_handler.go @@ -5,13 +5,14 @@ import ( ) type ServerHandler interface { - Listen() (net.Listener, error) - OnAccept(conn net.Conn) (net.Conn, error) + ServerContext() ServerContext + Init(serverCTX ServerContext) error + Listen(serverCTX ServerContext) (net.Listener, error) + OnStart(serverCTX ServerContext) - OnStart() - OnStop() + OnError(serverCTX ServerContext, conn net.Conn, status int, reason error) - Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) + OnStop(serverCTX ServerContext) IsClientDisconnect(err error) bool diff --git a/server_handlers.go b/server_handlers.go index fe7e2b7..0bd260e 100644 --- a/server_handlers.go +++ b/server_handlers.go @@ -10,18 +10,38 @@ import ( type ServerHandlers struct { } -func (sh *ServerHandlers) OnStart() { +func (sh *ServerHandlers) ServerContext() ServerContext { + +} + +func (sh *ServerHandlers) Init(serverCTX ServerContext) error { // no op } -func (sh *ServerHandlers) OnStop() { - // no op -} - -func (sh *ServerHandlers) Listen() (net.Listener, error) { +func (sh *ServerHandlers) Listen(serverCTX ServerContext) (net.Listener, error) { return nil, errors.New("Server: Handler method[Listen] of Server is not implement") } +func (sh *ServerHandlers) OnStart(serverCTX ServerContext) { + // no op +} + +func (sh *ServerHandlers) OnAccept(conn net.Conn) (net.Conn, error) { + +} + +func (sh *ServerHandlers) Handle(serverCTX ServerContext, conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) { + +} + +func (sh *ServerHandlers) OnError(serverCTX ServerContext, conn net.Conn, status int, reason error) { + +} + +func (sh *ServerHandlers) OnStop(serverCTX ServerContext) { + // no op +} + func (sh *ServerHandlers) OnAccept(conn net.Conn) (net.Conn, error) { return conn, nil } diff --git a/socket.go b/socket.go new file mode 100644 index 0000000..07c9111 --- /dev/null +++ b/socket.go @@ -0,0 +1,75 @@ +package server + +import ( + "io" + "net" + "sync" + "time" + +) + +type Socket interface { + // ID returns the identity of the client. + ID() string + Context() SocketContext + + net.Conn +} + +func newSocket(socketHandler SocketHandler, socketCTX SocketContext, conn net.Conn, id string) Socket { + s := retainSocket() + s.Conn = conn + s.sh = socketHandler + s.id = id + s.SetReadLimit(socketHandler.GetMaxMessageSize()) + if 0 < socketHandler.GetReadTimeout() { + s.SetReadDeadline(time.Now().Add(socketHandler.GetReadTimeout() * time.Second)) + } + s.ctx = socketCTX + + return s +} + +type netSocket struct { + net.Conn + + ctx SocketContext + sh SocketHandler + + id string + + // sc *SocketConn +} + +func (s *netSocket) Context() SocketContext { + return s.ctx +} + +func (s *netSocket) ID() string { + return s.id +} + + +func (s *netSocket) Close() error { + err := s.Conn.Close() + releaseSocket(s) + return err +} + +var socketPool sync.Pool + +func retainSocket() *netSocket { + v := socketPool.Get() + if v == nil { + return &netSocket{} + } + return v.(*netSocket) +} + +func releaseSocket(s *netSocket) { + s.sh = nil + s.ctx = nil + s.id = "" + + socketPool.Put(s) +} diff --git a/socket_context.go b/socket_context.go new file mode 100644 index 0000000..54778f4 --- /dev/null +++ b/socket_context.go @@ -0,0 +1,28 @@ +package server + +import ( + cuc "git.loafle.net/commons_go/util/context" +) + +type SocketContext interface { + cuc.Context + ServerContext() ServerContext +} + +func newSocketContext(serverCTX ServerContext) SocketContext { + sCTX := &socketContext{} + sCTX.Context = cuc.NewContext(serverCTX) + sCTX.serverCTX = serverCTX + + return sCTX +} + +type socketContext struct { + cuc.Context + + serverCTX ServerContext +} + +func (sc *socketContext) ServerContext() ServerContext { + return sc.serverCTX +} diff --git a/socket_handler.go b/socket_handler.go new file mode 100644 index 0000000..312836c --- /dev/null +++ b/socket_handler.go @@ -0,0 +1,70 @@ +package server + +import ( + "net" + "time" +) + +type SocketHandler interface { + SocketContext(serverCTX ServerContext) SocketContext + // Init invoked when server is stated + // If you override ths method, must call + // + // func (sh *SocketHandler) Init() error { + // if err := sh.SocketHandlers.Init(); nil != err { + // return err + // } + // ... + // ... + // return nil + // } + Init(serverCTX ServerContext) error + // Handshake do handshake client and server + // id is identity of client socket. if id is "", disallow connection + Handshake(serverCTX ServerContext, conn net.Conn) (id string) + // OnConnect invoked when client is connected + // If you override ths method, must call + // + // func (sh *SocketHandler) OnConnect(soc cwf.Socket) cwf.Socket { + // soc = sh.SocketHandlers.OnConnect(newSoc) + // newSoc := ... + // return newSoc + // } + OnConnect(soc Socket) + Handle(soc Socket, stopChan <-chan struct{}, doneChan chan<- error) + // OnDisconnect invoked when client is disconnected + // If you override ths method, must call + // + // func (sh *SocketHandler) OnDisconnect(soc cwf.Socket) { + // ... + // sh.SocketHandlers.OnDisconnect(soc) + // } + OnDisconnect(soc Socket) + // Destroy invoked when server is stopped + // If you override ths method, must call + // + // func (sh *SocketHandler) Destroy() { + // ... + // sh.SocketHandlers.Destroy() + // } + Destroy() + + GetSocket(id string) Socket + GetSockets() map[string]Socket + + GetMaxMessageSize() int64 + GetWriteTimeout() time.Duration + GetReadTimeout() time.Duration + + // Validate is check handler value + // If you override ths method, must call + // + // func (sh *SocketHandlers) Validate() { + // sh.SocketHandlers.Validate() + // ... + // } + Validate() + + addSocket(soc Socket) + removeSocket(soc Socket) +} diff --git a/socket_handlers.go b/socket_handlers.go new file mode 100644 index 0000000..2216dfc --- /dev/null +++ b/socket_handlers.go @@ -0,0 +1,92 @@ +package server + +import ( + "net" + "time" +) + +type SocketHandlers struct { + // MaxMessageSize is the maximum size for a message read from the peer. If a + // message exceeds the limit, the connection sends a close frame to the peer + // and returns ErrReadLimit to the application. + MaxMessageSize int64 + // WriteTimeout is the write deadline on the underlying network + // connection. After a write has timed out, the websocket state is corrupt and + // all future writes will return an error. A zero value for t means writes will + // not time out. + WriteTimeout time.Duration + // ReadTimeout is the read deadline on the underlying network connection. + // After a read has timed out, the websocket connection state is corrupt and + // all future reads will return an error. A zero value for t means reads will + // not time out. + ReadTimeout time.Duration + + sockets map[string]Socket +} + +func (sh *SocketHandlers) Init(serverCTX ServerContext) error { + sh.sockets = make(map[string]Socket) + + return nil +} + +func (sh *SocketHandlers) Handshake(serverCTX ServerContext, conn net.Conn) (id string) { + return "", nil +} + +func (sh *SocketHandlers) SocketContext(serverCTX ServerContext) SocketContext { + return newSocketContext(serverCTX) +} + +func (sh *SocketHandlers) OnConnect(soc Socket) { + // no op +} + +func (sh *SocketHandlers) Handle(soc Socket, stopChan <-chan struct{}, doneChan chan<- error) { + // no op +} + +func (sh *SocketHandlers) OnDisconnect(soc Socket) { + +} + +func (sh *SocketHandlers) Destroy() { + // no op +} + +func (sh *SocketHandlers) GetSocket(id string) Socket { + return sh.sockets[id] +} +func (sh *SocketHandlers) GetSockets() map[string]Socket { + return sh.sockets +} + +func (sh *SocketHandlers) GetMaxMessageSize() int64 { + return sh.MaxMessageSize +} +func (sh *SocketHandlers) GetWriteTimeout() time.Duration { + return sh.WriteTimeout +} +func (sh *SocketHandlers) GetReadTimeout() time.Duration { + return sh.ReadTimeout +} + +func (sh *SocketHandlers) Validate() { + if sh.MaxMessageSize <= 0 { + sh.MaxMessageSize = DefaultMaxMessageSize + } + if sh.WriteTimeout <= 0 { + sh.WriteTimeout = DefaultWriteTimeout + } + if sh.ReadTimeout <= 0 { + sh.ReadTimeout = DefaultReadTimeout + } +} + +func (sh *SocketHandlers) addSocket(soc Socket) { + sh.sockets[soc.ID()] = soc +} + +func (sh *SocketHandlers) removeSocket(soc Socket) { + delete(sh.sockets, soc.ID()) +}