ing
This commit is contained in:
parent
52c900f0a2
commit
4163705d49
126
client.go
Normal file
126
client.go
Normal file
|
@ -0,0 +1,126 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.loafle.net/commons_go/logging"
|
||||
)
|
||||
|
||||
type Client interface {
|
||||
Start() error
|
||||
Stop()
|
||||
}
|
||||
|
||||
func NewClient(ch ClientHandler) Client {
|
||||
s := &client{
|
||||
clientHandler: ch,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
type client struct {
|
||||
clientHandler ClientHandler
|
||||
|
||||
Stats ConnStats
|
||||
|
||||
stopChan chan struct{}
|
||||
stopWg sync.WaitGroup
|
||||
}
|
||||
|
||||
func (c *client) Start() error {
|
||||
if nil == c.clientHandler {
|
||||
panic("Client: client handler must be specified.")
|
||||
}
|
||||
c.clientHandler.Validate()
|
||||
|
||||
if c.stopChan != nil {
|
||||
panic("Client: client is already running. Stop it before starting it again")
|
||||
}
|
||||
c.stopChan = make(chan struct{})
|
||||
|
||||
c.stopWg.Add(1)
|
||||
go runClient(c)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *client) Stop() {
|
||||
if c.stopChan == nil {
|
||||
panic("Client: client must be started before stopping it")
|
||||
}
|
||||
close(c.stopChan)
|
||||
c.stopWg.Wait()
|
||||
c.stopChan = nil
|
||||
}
|
||||
|
||||
func runClient(c *client) {
|
||||
defer c.stopWg.Done()
|
||||
|
||||
var conn io.ReadWriteCloser
|
||||
var err error
|
||||
var stopping atomic.Value
|
||||
|
||||
for {
|
||||
dialChan := make(chan struct{})
|
||||
go func() {
|
||||
if conn, err = c.clientHandler.Dial(); err != nil {
|
||||
if stopping.Load() == nil {
|
||||
logging.Logger.Error(fmt.Sprintf("Client: [%s].Cannot establish rpc connection: [%s]", c.clientHandler.GetAddr(), err))
|
||||
}
|
||||
}
|
||||
close(dialChan)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-c.stopChan:
|
||||
stopping.Store(true)
|
||||
<-dialChan
|
||||
return
|
||||
case <-dialChan:
|
||||
c.Stats.incDialCalls()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
c.Stats.incDialErrors()
|
||||
select {
|
||||
case <-c.stopChan:
|
||||
return
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
c.stopWg.Add(1)
|
||||
handleClientConnection(c, conn)
|
||||
|
||||
select {
|
||||
case <-c.stopChan:
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func handleClientConnection(c *client, conn io.ReadWriteCloser) {
|
||||
if err := c.clientHandler.OnHandshake(c.clientHandler.GetAddr(), conn); nil != err {
|
||||
logging.Logger.Error(fmt.Sprintf("Client: [%s]. handshake error: [%s]", c.clientHandler.GetAddr(), err))
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
clientStopChan := make(chan struct{})
|
||||
go c.clientHandler.Handle(conn, clientStopChan)
|
||||
|
||||
select {
|
||||
case <-c.stopChan:
|
||||
close(clientStopChan)
|
||||
conn.Close()
|
||||
return
|
||||
case <-clientStopChan:
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
}
|
12
client_handler.go
Normal file
12
client_handler.go
Normal file
|
@ -0,0 +1,12 @@
|
|||
package server
|
||||
|
||||
import "io"
|
||||
|
||||
type ClientHandler interface {
|
||||
Dial() (conn io.ReadWriteCloser, err error)
|
||||
OnHandshake(remoteAddr string, rwc io.ReadWriteCloser) error
|
||||
Handle(rwc io.ReadWriteCloser, stopChan chan struct{})
|
||||
|
||||
GetAddr() string
|
||||
Validate()
|
||||
}
|
77
client_handlers.go
Normal file
77
client_handlers.go
Normal file
|
@ -0,0 +1,77 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ClientHandlers struct {
|
||||
// Server address to connect to.
|
||||
//
|
||||
// The address format depends on the underlying transport provided
|
||||
// by Client.Dial. The following transports are provided out of the box:
|
||||
// * TCP - see NewTCPClient() and NewTCPServer().
|
||||
// * TLS - see NewTLSClient() and NewTLSServer().
|
||||
// * Unix sockets - see NewUnixClient() and NewUnixServer().
|
||||
//
|
||||
// By default TCP transport is used.
|
||||
Addr string
|
||||
|
||||
// The maximum number of pending requests in the queue.
|
||||
//
|
||||
// The number of pending requsts should exceed the expected number
|
||||
// of concurrent goroutines calling client's methods.
|
||||
// Otherwise a lot of ClientError.Overflow errors may appear.
|
||||
//
|
||||
// Default is DefaultPendingMessages.
|
||||
PendingRequests int
|
||||
|
||||
// Maximum request time.
|
||||
// Default value is DefaultRequestTimeout.
|
||||
RequestTimeout time.Duration
|
||||
|
||||
// Size of send buffer per each underlying connection in bytes.
|
||||
// Default value is DefaultBufferSize.
|
||||
SendBufferSize int
|
||||
|
||||
// Size of recv buffer per each underlying connection in bytes.
|
||||
// Default value is DefaultBufferSize.
|
||||
RecvBufferSize int
|
||||
|
||||
KeepAlivePeriod time.Duration
|
||||
}
|
||||
|
||||
func (ch *ClientHandlers) Dial() (conn io.ReadWriteCloser, err error) {
|
||||
return nil, errors.New("Client: Handler method[Dial] of Client is not implement")
|
||||
}
|
||||
|
||||
func (ch *ClientHandlers) OnHandshake(remoteAddr string, rwc io.ReadWriteCloser) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ch *ClientHandlers) Handle(rwc io.ReadWriteCloser, stopChan chan struct{}) {
|
||||
|
||||
}
|
||||
|
||||
func (ch *ClientHandlers) GetAddr() string {
|
||||
return ch.Addr
|
||||
}
|
||||
|
||||
func (ch *ClientHandlers) Validate() {
|
||||
if ch.PendingRequests <= 0 {
|
||||
ch.PendingRequests = DefaultPendingMessages
|
||||
}
|
||||
if ch.RequestTimeout <= 0 {
|
||||
ch.RequestTimeout = DefaultRequestTimeout
|
||||
}
|
||||
if ch.SendBufferSize <= 0 {
|
||||
ch.SendBufferSize = DefaultBufferSize
|
||||
}
|
||||
if ch.RecvBufferSize <= 0 {
|
||||
ch.RecvBufferSize = DefaultBufferSize
|
||||
}
|
||||
if ch.KeepAlivePeriod <= 0 {
|
||||
ch.KeepAlivePeriod = DefaultKeepAlivePeriod
|
||||
}
|
||||
}
|
26
constants.go
Normal file
26
constants.go
Normal file
|
@ -0,0 +1,26 @@
|
|||
package server
|
||||
|
||||
import "time"
|
||||
|
||||
const (
|
||||
// DefaultConcurrency is the default number of concurrent rpc calls
|
||||
// the server can process.
|
||||
DefaultConcurrency = 8 * 1024
|
||||
|
||||
// DefaultRequestTimeout is the default timeout for client request.
|
||||
DefaultRequestTimeout = 20 * time.Second
|
||||
|
||||
// DefaultPendingMessages is the default number of pending messages
|
||||
// handled by Client and Server.
|
||||
DefaultPendingMessages = 32 * 1024
|
||||
|
||||
// DefaultFlushDelay is the default delay between message flushes
|
||||
// on Client and Server.
|
||||
DefaultFlushDelay = -1
|
||||
|
||||
// DefaultBufferSize is the default size for Client and Server buffers.
|
||||
DefaultBufferSize = 64 * 1024
|
||||
|
||||
// DefaultKeepAlivePeriod is the default time for KeepAlivePeriod of connection.
|
||||
DefaultKeepAlivePeriod = 0
|
||||
)
|
11
ipc/client_handlers.go
Normal file
11
ipc/client_handlers.go
Normal file
|
@ -0,0 +1,11 @@
|
|||
package ipc
|
||||
|
||||
import (
|
||||
"git.loafle.net/commons_go/server"
|
||||
)
|
||||
|
||||
type ClientHandlers struct {
|
||||
server.ClientHandlers
|
||||
|
||||
path string
|
||||
}
|
10
ipc/client_handlers_unix.go
Normal file
10
ipc/client_handlers_unix.go
Normal file
|
@ -0,0 +1,10 @@
|
|||
package ipc
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
)
|
||||
|
||||
func (ch *ClientHandlers) Dial() (conn io.ReadWriteCloser, err error) {
|
||||
return net.Dial("unix", ch.Addr)
|
||||
}
|
11
ipc/client_handlers_windows.go
Normal file
11
ipc/client_handlers_windows.go
Normal file
|
@ -0,0 +1,11 @@
|
|||
package ipc
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"gopkg.in/natefinch/npipe.v2"
|
||||
)
|
||||
|
||||
func (ch *ClientHandlers) Dial() (conn io.ReadWriteCloser, err error) {
|
||||
return npipe.Dial(ch.Addr)
|
||||
}
|
|
@ -4,6 +4,4 @@ import "git.loafle.net/commons_go/server"
|
|||
|
||||
type ServerHandlers struct {
|
||||
server.ServerHandlers
|
||||
|
||||
path string
|
||||
}
|
|
@ -5,8 +5,8 @@ import (
|
|||
"os"
|
||||
)
|
||||
|
||||
// Addr is ex:/tmp/server.sock
|
||||
func (sh *ServerHandlers) Listen() (l net.Listener, err error) {
|
||||
//sh.path = filepath.Join(os.TempDir(), sh.Addr)
|
||||
os.Remove(sh.Addr)
|
||||
l, err = net.ListenUnix("unix", &net.UnixAddr{Name: sh.Addr, Net: "unix"})
|
||||
|
|
@ -6,9 +6,8 @@ import (
|
|||
"gopkg.in/natefinch/npipe.v2"
|
||||
)
|
||||
|
||||
// Addr is ex:\\.\pipe\server
|
||||
func (sh *ServerHandlers) Listen() (net.Listener, error) {
|
||||
sh.path = `\\.\pipe\` + sh.Addr
|
||||
|
||||
return npipe.Listen(s.path)
|
||||
}
|
||||
|
|
@ -113,11 +113,11 @@ func runServer(s *server) {
|
|||
}
|
||||
|
||||
s.stopWg.Add(1)
|
||||
go handleConnection(s, conn, clientAddr)
|
||||
go handleServerConnection(s, conn, clientAddr)
|
||||
}
|
||||
}
|
||||
|
||||
func handleConnection(s *server, conn io.ReadWriteCloser, clientAddr string) {
|
||||
func handleServerConnection(s *server, conn io.ReadWriteCloser, clientAddr string) {
|
||||
defer s.stopWg.Done()
|
||||
logging.Logger.Debug(fmt.Sprintf("Server: Client[%s] is connected.", clientAddr))
|
||||
clientStopChan := make(chan struct{})
|
||||
|
|
|
@ -7,29 +7,6 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultConcurrency is the default number of concurrent rpc calls
|
||||
// the server can process.
|
||||
DefaultConcurrency = 8 * 1024
|
||||
|
||||
// DefaultRequestTimeout is the default timeout for client request.
|
||||
DefaultRequestTimeout = 20 * time.Second
|
||||
|
||||
// DefaultPendingMessages is the default number of pending messages
|
||||
// handled by Client and Server.
|
||||
DefaultPendingMessages = 32 * 1024
|
||||
|
||||
// DefaultFlushDelay is the default delay between message flushes
|
||||
// on Client and Server.
|
||||
DefaultFlushDelay = -1
|
||||
|
||||
// DefaultBufferSize is the default size for Client and Server buffers.
|
||||
DefaultBufferSize = 64 * 1024
|
||||
|
||||
// DefaultKeepAlivePeriod is the default time for KeepAlivePeriod of connection.
|
||||
DefaultKeepAlivePeriod = 0
|
||||
)
|
||||
|
||||
type ServerHandlers struct {
|
||||
// Address to listen to for incoming connections.
|
||||
//
|
||||
|
@ -66,31 +43,6 @@ func (sh *ServerHandlers) GetAddr() string {
|
|||
return sh.Addr
|
||||
}
|
||||
|
||||
// func (sh *ServerHandlers) {
|
||||
|
||||
// }
|
||||
// func (sh *ServerHandlers) {
|
||||
|
||||
// }
|
||||
// func (sh *ServerHandlers) {
|
||||
|
||||
// }
|
||||
// func (sh *ServerHandlers) {
|
||||
|
||||
// }
|
||||
// func (sh *ServerHandlers) {
|
||||
|
||||
// }
|
||||
// func (sh *ServerHandlers) {
|
||||
|
||||
// }
|
||||
// func (sh *ServerHandlers) {
|
||||
|
||||
// }
|
||||
// func (sh *ServerHandlers) {
|
||||
|
||||
// }
|
||||
|
||||
func (sh *ServerHandlers) accept(l net.Listener) (io.ReadWriteCloser, string, error) {
|
||||
conn, err := l.Accept()
|
||||
if nil != err {
|
||||
|
|
22
tcp/client_handlers.go
Normal file
22
tcp/client_handlers.go
Normal file
|
@ -0,0 +1,22 @@
|
|||
package tcp
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"git.loafle.net/commons_go/server"
|
||||
)
|
||||
|
||||
type ClientHandlers struct {
|
||||
server.ClientHandlers
|
||||
}
|
||||
|
||||
func (ch *ClientHandlers) Dial() (conn io.ReadWriteCloser, err error) {
|
||||
dialer := &net.Dialer{
|
||||
Timeout: ch.RequestTimeout * time.Second,
|
||||
KeepAlive: ch.KeepAlivePeriod * time.Second,
|
||||
}
|
||||
|
||||
return dialer.Dial("tcp", ch.Addr)
|
||||
}
|
25
tcp_tls/client_handlers.go
Normal file
25
tcp_tls/client_handlers.go
Normal file
|
@ -0,0 +1,25 @@
|
|||
package tcp_tls
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"io"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"git.loafle.net/commons_go/server"
|
||||
)
|
||||
|
||||
type ClientHandlers struct {
|
||||
server.ClientHandlers
|
||||
|
||||
tlsConfig *tls.Config
|
||||
}
|
||||
|
||||
func (ch *ClientHandlers) Dial() (conn io.ReadWriteCloser, err error) {
|
||||
dialer := &net.Dialer{
|
||||
Timeout: ch.RequestTimeout * time.Second,
|
||||
KeepAlive: ch.KeepAlivePeriod * time.Second,
|
||||
}
|
||||
|
||||
return tls.DialWithDialer(dialer, "tcp", ch.Addr, ch.tlsConfig)
|
||||
}
|
Loading…
Reference in New Issue
Block a user