This commit is contained in:
crusader 2018-05-03 18:54:38 +09:00
parent a35822cbd7
commit ba176f57f3
6 changed files with 116 additions and 162 deletions

57
client/client-handler.go Normal file
View File

@ -0,0 +1,57 @@
package client
import (
"sync/atomic"
crc "git.loafle.net/commons/rpc-go/client"
occ "git.loafle.net/overflow/container-go/client"
)
type ClientHandler interface {
occ.ClientHandler
}
type ClientHandlers struct {
occ.ClientHandlers
validated atomic.Value
}
func (ch *ClientHandlers) Init(clientCtx crc.ClientCtx) error {
if err := ch.ClientHandlers.Init(clientCtx); nil != err {
return err
}
return nil
}
func (ch *ClientHandlers) OnStart(clientCtx crc.ClientCtx) error {
if err := ch.ClientHandlers.OnStart(clientCtx); nil != err {
return err
}
return nil
}
func (ch *ClientHandlers) OnStop(clientCtx crc.ClientCtx) {
ch.ClientHandlers.OnStop(clientCtx)
}
func (ch *ClientHandlers) Destroy(clientCtx crc.ClientCtx) {
ch.ClientHandlers.Destroy(clientCtx)
}
func (ch *ClientHandlers) Validate() error {
if nil != ch.validated.Load() {
return nil
}
ch.validated.Store(true)
if err := ch.ClientHandlers.Validate(); nil != err {
return err
}
return nil
}

50
client/client.go Normal file
View File

@ -0,0 +1,50 @@
package client
import (
cdr "git.loafle.net/commons/di-go/registry"
"git.loafle.net/commons/logging-go"
crc "git.loafle.net/commons/rpc-go/client"
crpj "git.loafle.net/commons/rpc-go/protocol/json"
crr "git.loafle.net/commons/rpc-go/registry"
occp "git.loafle.net/overflow/commons-go/config/probe"
occa "git.loafle.net/overflow/commons-go/core/annotation"
"git.loafle.net/overflow/container-go"
occ "git.loafle.net/overflow/container-go/client"
"git.loafle.net/overflow/container_network/crawler"
"git.loafle.net/overflow/container_network/service"
)
func New(portNumber int) *crc.Client {
rpcClientCodec := crpj.NewClientCodec()
connector, err := occ.NewConnector(occp.ContainerDiscovery, portNumber)
if nil != err {
logging.Logger().Panic(err)
}
ch := &ClientHandlers{}
ch.Name = occp.ContainerNetwork.String()
ch.Connector = connector
ch.RPCCodec = rpcClientCodec
c := &crc.Client{
ClientHandler: ch,
}
cdr.RegisterResource(container.CONTAINER_CRAWLERS, crawler.GetCrawlers())
cdr.RegisterResource(container.CONTAINER_RPC_CLIENT, c)
services, err := cdr.GetInstancesByAnnotationType(occa.RPCServiceAnnotationType)
if nil != err {
logging.Logger().Panic(err)
}
rpcRegistry := crr.NewRPCRegistry()
rpcRegistry.RegisterServices(services...)
ch.RPCInvoker = rpcRegistry
ch.Services = services
ch.OrderedServices = service.OrderedServices
return c
}

16
main.go
View File

@ -11,15 +11,15 @@ import (
"git.loafle.net/commons/logging-go"
occp "git.loafle.net/overflow/commons-go/config/probe"
"git.loafle.net/overflow/container_network/server"
"git.loafle.net/overflow/container_network/client"
)
var (
pidFilePath *string
portNumber *int
)
func init() {
pidFilePath = flag.String(occp.FlagPidFilePathName, "./dist/network.pid", "PID file path")
portNumber = flag.Int(occp.FlagProbePortName, 60000, "Port number of Probe")
loggingConfigFilePath := flag.String(occp.FlagLoggingConfigFilePathName, "", "logging config path")
flag.Parse()
@ -29,15 +29,13 @@ func init() {
func main() {
defer logging.Logger().Sync()
s := server.New(*pidFilePath)
c := client.New(*portNumber)
go func() {
err := s.ListenAndServe()
err := c.Start()
if nil != err {
log.Printf("err: %v", err)
}
os.Exit(1)
}()
}
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt,
@ -51,7 +49,7 @@ func main() {
<-interrupt
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := s.Shutdown(ctx); err != nil {
if err := c.Stop(ctx); err != nil {
logging.Logger().Errorf("error: %v", err)
}
}

View File

@ -1,48 +0,0 @@
package server
import (
cs "git.loafle.net/commons/server-go"
ocs "git.loafle.net/overflow/container-go/server"
)
type ServerHandler interface {
ocs.ServerHandler
}
type ServerHandlers struct {
ocs.ServerHandlers
}
func (sh *ServerHandlers) Init(serverCtx cs.ServerCtx) error {
if err := sh.ServerHandlers.Init(serverCtx); nil != err {
return err
}
return nil
}
func (sh *ServerHandlers) OnStart(serverCtx cs.ServerCtx) error {
if err := sh.ServerHandlers.OnStart(serverCtx); nil != err {
return err
}
return nil
}
func (sh *ServerHandlers) OnStop(serverCtx cs.ServerCtx) {
sh.ServerHandlers.OnStop(serverCtx)
}
func (sh *ServerHandlers) Destroy(serverCtx cs.ServerCtx) {
sh.ServerHandlers.Destroy(serverCtx)
}
func (sh *ServerHandlers) Validate() error {
if err := sh.ServerHandlers.Validate(); nil != err {
return err
}
return nil
}

View File

@ -1,48 +0,0 @@
package server
import (
cdr "git.loafle.net/commons/di-go/registry"
logging "git.loafle.net/commons/logging-go"
crpj "git.loafle.net/commons/rpc-go/protocol/json"
crr "git.loafle.net/commons/rpc-go/registry"
cssn "git.loafle.net/commons/server-go/socket/net"
occa "git.loafle.net/overflow/commons-go/core/annotation"
"git.loafle.net/overflow/container-go"
"git.loafle.net/overflow/container_network/crawler"
"git.loafle.net/overflow/container_network/service"
"git.loafle.net/overflow/container_network/servlet"
)
func New(pidFilePath string) *cssn.Server {
rpcWriteChan := make(chan *container.RPCNotification, 256)
cdr.RegisterResource(container.CONTAINER_CRAWLERS, crawler.GetCrawlers())
cdr.RegisterResource(container.CONTAINER_RPC_WRITE_CHAN, rpcWriteChan)
services, err := cdr.GetInstancesByAnnotationType(occa.RPCServiceAnnotationType)
if nil != err {
logging.Logger().Panic(err)
}
rpcRegistry := crr.NewRPCRegistry()
rpcRegistry.RegisterServices(services...)
ns := &servlet.NetworkServlets{}
ns.RPCInvoker = rpcRegistry
ns.RPCWriteChan = rpcWriteChan
ns.RPCServerCodec = crpj.NewServerCodec()
sh := &ServerHandlers{}
sh.Name = "Container Network"
sh.PIDFilePath = pidFilePath
sh.Services = services
sh.OrderedServices = service.OrderedServices
sh.RegisterServlet(ns)
s := &cssn.Server{
ServerHandler: sh,
}
return s
}

View File

@ -1,55 +0,0 @@
package servlet
import (
"net"
"git.loafle.net/commons/server-go"
css "git.loafle.net/commons/server-go/socket"
ocs "git.loafle.net/overflow/container-go/servlet"
)
type NetworkServlet interface {
ocs.RPCServlet
}
type NetworkServlets struct {
ocs.RPCServlets
}
func (s *NetworkServlets) Init(serverCtx server.ServerCtx) error {
if err := s.RPCServlets.Init(serverCtx); nil != err {
return err
}
return nil
}
func (s *NetworkServlets) OnStart(serverCtx server.ServerCtx) error {
if err := s.RPCServlets.OnStart(serverCtx); nil != err {
return err
}
return nil
}
func (s *NetworkServlets) OnStop(serverCtx server.ServerCtx) {
s.RPCServlets.OnStop(serverCtx)
}
func (s *NetworkServlets) Destroy(serverCtx server.ServerCtx) {
s.RPCServlets.Destroy(serverCtx)
}
func (s *NetworkServlets) Handshake(servletCtx server.ServletCtx, conn net.Conn) error {
return nil
}
func (s *NetworkServlets) OnConnect(servletCtx server.ServletCtx, conn css.Conn) {
s.RPCServlets.OnConnect(servletCtx, conn)
}
func (s *NetworkServlets) OnDisconnect(servletCtx server.ServletCtx) {
s.RPCServlets.OnDisconnect(servletCtx)
}