This commit is contained in:
crusader 2018-05-03 15:30:23 +09:00
parent 7a76fcd0e2
commit 6f38baf3ae
6 changed files with 112 additions and 157 deletions

57
client/client-handler.go Normal file
View File

@ -0,0 +1,57 @@
package client
import (
"sync/atomic"
crc "git.loafle.net/commons/rpc-go/client"
occ "git.loafle.net/overflow/container-go/client"
)
type ClientHandler interface {
occ.ClientHandler
}
type ClientHandlers struct {
occ.ClientHandlers
validated atomic.Value
}
func (ch *ClientHandlers) Init(clientCtx crc.ClientCtx) error {
if err := ch.ClientHandlers.Init(clientCtx); nil != err {
return err
}
return nil
}
func (ch *ClientHandlers) OnStart(clientCtx crc.ClientCtx) error {
if err := ch.ClientHandlers.OnStart(clientCtx); nil != err {
return err
}
return nil
}
func (ch *ClientHandlers) OnStop(clientCtx crc.ClientCtx) {
ch.ClientHandlers.OnStop(clientCtx)
}
func (ch *ClientHandlers) Destroy(clientCtx crc.ClientCtx) {
ch.ClientHandlers.Destroy(clientCtx)
}
func (ch *ClientHandlers) Validate() error {
if nil != ch.validated.Load() {
return nil
}
ch.validated.Store(true)
if err := ch.ClientHandlers.Validate(); nil != err {
return err
}
return nil
}

49
client/client.go Normal file
View File

@ -0,0 +1,49 @@
package client
import (
cdr "git.loafle.net/commons/di-go/registry"
"git.loafle.net/commons/logging-go"
crc "git.loafle.net/commons/rpc-go/client"
crpj "git.loafle.net/commons/rpc-go/protocol/json"
crr "git.loafle.net/commons/rpc-go/registry"
occp "git.loafle.net/overflow/commons-go/config/probe"
occa "git.loafle.net/overflow/commons-go/core/annotation"
"git.loafle.net/overflow/container-go"
occ "git.loafle.net/overflow/container-go/client"
"git.loafle.net/overflow/container_discovery/crawler"
"git.loafle.net/overflow/container_discovery/service"
)
func New(portNumber int) *crc.Client {
rpcWriteChan := make(chan []byte, 256)
rpcClientCodec := crpj.NewClientCodec()
cdr.RegisterResource(container.CONTAINER_CRAWLERS, crawler.GetCrawlers())
cdr.RegisterResource(container.CONTAINER_RPC_WRITE_CHAN, rpcWriteChan)
cdr.RegisterResource(container.CONTAINER_RPC_CLIENT_CODEC, rpcClientCodec)
services, err := cdr.GetInstancesByAnnotationType(occa.RPCServiceAnnotationType)
if nil != err {
logging.Logger().Panic(err)
}
rpcRegistry := crr.NewRPCRegistry()
rpcRegistry.RegisterServices(services...)
connector, err := occ.NewConnector(occp.ContainerDiscovery, portNumber)
if nil != err {
logging.Logger().Panic(err)
}
ch := &ClientHandlers{}
ch.Name = occp.ContainerDiscovery.String()
ch.Connector = connector
ch.RPCCodec = rpcClientCodec
ch.RPCInvoker = rpcRegistry
ch.Services = services
ch.OrderedServices = service.OrderedServices
return &crc.Client{
ClientHandler: ch,
}
}

12
main.go
View File

@ -11,15 +11,15 @@ import (
"git.loafle.net/commons/logging-go"
occp "git.loafle.net/overflow/commons-go/config/probe"
"git.loafle.net/overflow/container_discovery/server"
"git.loafle.net/overflow/container_discovery/client"
)
var (
pidFilePath *string
portNumber *int
)
func init() {
pidFilePath = flag.String(occp.FlagPidFilePathName, "./dist/discovery.pid", "PID file path")
portNumber = flag.Int(occp.FlagProbePortName, 0, "Port number of Probe")
loggingConfigFilePath := flag.String(occp.FlagLoggingConfigFilePathName, "", "logging config path")
flag.Parse()
@ -29,10 +29,10 @@ func init() {
func main() {
defer logging.Logger().Sync()
s := server.New(*pidFilePath)
c := client.New(*portNumber)
go func() {
err := s.ListenAndServe()
err := c.Start()
if nil != err {
log.Printf("err: %v", err)
}
@ -51,7 +51,7 @@ func main() {
<-interrupt
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := s.Shutdown(ctx); err != nil {
if err := c.Stop(ctx); err != nil {
logging.Logger().Errorf("error: %v", err)
}
}

View File

@ -1,48 +0,0 @@
package server
import (
cs "git.loafle.net/commons/server-go"
ocs "git.loafle.net/overflow/container-go/server"
)
type ServerHandler interface {
ocs.ServerHandler
}
type ServerHandlers struct {
ocs.ServerHandlers
}
func (sh *ServerHandlers) Init(serverCtx cs.ServerCtx) error {
if err := sh.ServerHandlers.Init(serverCtx); nil != err {
return err
}
return nil
}
func (sh *ServerHandlers) OnStart(serverCtx cs.ServerCtx) error {
if err := sh.ServerHandlers.OnStart(serverCtx); nil != err {
return err
}
return nil
}
func (sh *ServerHandlers) OnStop(serverCtx cs.ServerCtx) {
sh.ServerHandlers.OnStop(serverCtx)
}
func (sh *ServerHandlers) Destroy(serverCtx cs.ServerCtx) {
sh.ServerHandlers.Destroy(serverCtx)
}
func (sh *ServerHandlers) Validate() error {
if err := sh.ServerHandlers.Validate(); nil != err {
return err
}
return nil
}

View File

@ -1,48 +0,0 @@
package server
import (
cdr "git.loafle.net/commons/di-go/registry"
logging "git.loafle.net/commons/logging-go"
crpj "git.loafle.net/commons/rpc-go/protocol/json"
crr "git.loafle.net/commons/rpc-go/registry"
cssn "git.loafle.net/commons/server-go/socket/net"
occa "git.loafle.net/overflow/commons-go/core/annotation"
"git.loafle.net/overflow/container-go"
"git.loafle.net/overflow/container_discovery/crawler"
"git.loafle.net/overflow/container_discovery/service"
"git.loafle.net/overflow/container_discovery/servlet"
)
func New(pidFilePath string) *cssn.Server {
rpcWriteChan := make(chan *container.RPCNotification, 256)
cdr.RegisterResource(container.CONTAINER_CRAWLERS, crawler.GetCrawlers())
cdr.RegisterResource(container.CONTAINER_RPC_WRITE_CHAN, rpcWriteChan)
services, err := cdr.GetInstancesByAnnotationType(occa.RPCServiceAnnotationType)
if nil != err {
logging.Logger().Panic(err)
}
rpcRegistry := crr.NewRPCRegistry()
rpcRegistry.RegisterServices(services...)
ds := &servlet.DiscoveryServlets{}
ds.RPCInvoker = rpcRegistry
ds.RPCWriteChan = rpcWriteChan
ds.RPCServerCodec = crpj.NewServerCodec()
sh := &ServerHandlers{}
sh.Name = "Container Discovery"
sh.PIDFilePath = pidFilePath
sh.Services = services
sh.OrderedServices = service.OrderedServices
sh.RegisterServlet(ds)
s := &cssn.Server{
ServerHandler: sh,
}
return s
}

View File

@ -1,55 +0,0 @@
package servlet
import (
"net"
"git.loafle.net/commons/server-go"
css "git.loafle.net/commons/server-go/socket"
ocs "git.loafle.net/overflow/container-go/servlet"
)
type DiscoveryServlet interface {
ocs.RPCServlet
}
type DiscoveryServlets struct {
ocs.RPCServlets
}
func (s *DiscoveryServlets) Init(serverCtx server.ServerCtx) error {
if err := s.RPCServlets.Init(serverCtx); nil != err {
return err
}
return nil
}
func (s *DiscoveryServlets) OnStart(serverCtx server.ServerCtx) error {
if err := s.RPCServlets.OnStart(serverCtx); nil != err {
return err
}
return nil
}
func (s *DiscoveryServlets) OnStop(serverCtx server.ServerCtx) {
s.RPCServlets.OnStop(serverCtx)
}
func (s *DiscoveryServlets) Destroy(serverCtx server.ServerCtx) {
s.RPCServlets.Destroy(serverCtx)
}
func (s *DiscoveryServlets) Handshake(servletCtx server.ServletCtx, conn net.Conn) error {
return nil
}
func (s *DiscoveryServlets) OnConnect(servletCtx server.ServletCtx, conn css.Conn) {
s.RPCServlets.OnConnect(servletCtx, conn)
}
func (s *DiscoveryServlets) OnDisconnect(servletCtx server.ServletCtx) {
s.RPCServlets.OnDisconnect(servletCtx)
}