From 6f38baf3ae3455679b50e39559d455d76018cad4 Mon Sep 17 00:00:00 2001 From: crusader Date: Thu, 3 May 2018 15:30:23 +0900 Subject: [PATCH] ing --- client/client-handler.go | 57 ++++++++++++++++++++++++++++++++++++ client/client.go | 49 +++++++++++++++++++++++++++++++ main.go | 12 ++++---- server/server-handler.go | 48 ------------------------------ server/server.go | 48 ------------------------------ servlet/discovery-servlet.go | 55 ---------------------------------- 6 files changed, 112 insertions(+), 157 deletions(-) create mode 100644 client/client-handler.go create mode 100644 client/client.go delete mode 100644 server/server-handler.go delete mode 100644 server/server.go delete mode 100644 servlet/discovery-servlet.go diff --git a/client/client-handler.go b/client/client-handler.go new file mode 100644 index 0000000..deff280 --- /dev/null +++ b/client/client-handler.go @@ -0,0 +1,57 @@ +package client + +import ( + "sync/atomic" + + crc "git.loafle.net/commons/rpc-go/client" + occ "git.loafle.net/overflow/container-go/client" +) + +type ClientHandler interface { + occ.ClientHandler +} + +type ClientHandlers struct { + occ.ClientHandlers + + validated atomic.Value +} + +func (ch *ClientHandlers) Init(clientCtx crc.ClientCtx) error { + if err := ch.ClientHandlers.Init(clientCtx); nil != err { + return err + } + + return nil +} + +func (ch *ClientHandlers) OnStart(clientCtx crc.ClientCtx) error { + if err := ch.ClientHandlers.OnStart(clientCtx); nil != err { + return err + } + + return nil +} + +func (ch *ClientHandlers) OnStop(clientCtx crc.ClientCtx) { + + ch.ClientHandlers.OnStop(clientCtx) +} + +func (ch *ClientHandlers) Destroy(clientCtx crc.ClientCtx) { + + ch.ClientHandlers.Destroy(clientCtx) +} + +func (ch *ClientHandlers) Validate() error { + if nil != ch.validated.Load() { + return nil + } + ch.validated.Store(true) + + if err := ch.ClientHandlers.Validate(); nil != err { + return err + } + + return nil +} diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..afb07ac --- /dev/null +++ b/client/client.go @@ -0,0 +1,49 @@ +package client + +import ( + cdr "git.loafle.net/commons/di-go/registry" + "git.loafle.net/commons/logging-go" + crc "git.loafle.net/commons/rpc-go/client" + crpj "git.loafle.net/commons/rpc-go/protocol/json" + crr "git.loafle.net/commons/rpc-go/registry" + occp "git.loafle.net/overflow/commons-go/config/probe" + occa "git.loafle.net/overflow/commons-go/core/annotation" + "git.loafle.net/overflow/container-go" + occ "git.loafle.net/overflow/container-go/client" + "git.loafle.net/overflow/container_discovery/crawler" + "git.loafle.net/overflow/container_discovery/service" +) + +func New(portNumber int) *crc.Client { + rpcWriteChan := make(chan []byte, 256) + rpcClientCodec := crpj.NewClientCodec() + + cdr.RegisterResource(container.CONTAINER_CRAWLERS, crawler.GetCrawlers()) + cdr.RegisterResource(container.CONTAINER_RPC_WRITE_CHAN, rpcWriteChan) + cdr.RegisterResource(container.CONTAINER_RPC_CLIENT_CODEC, rpcClientCodec) + + services, err := cdr.GetInstancesByAnnotationType(occa.RPCServiceAnnotationType) + if nil != err { + logging.Logger().Panic(err) + } + + rpcRegistry := crr.NewRPCRegistry() + rpcRegistry.RegisterServices(services...) + + connector, err := occ.NewConnector(occp.ContainerDiscovery, portNumber) + if nil != err { + logging.Logger().Panic(err) + } + + ch := &ClientHandlers{} + ch.Name = occp.ContainerDiscovery.String() + ch.Connector = connector + ch.RPCCodec = rpcClientCodec + ch.RPCInvoker = rpcRegistry + ch.Services = services + ch.OrderedServices = service.OrderedServices + + return &crc.Client{ + ClientHandler: ch, + } +} diff --git a/main.go b/main.go index 14c3e35..494f30e 100644 --- a/main.go +++ b/main.go @@ -11,15 +11,15 @@ import ( "git.loafle.net/commons/logging-go" occp "git.loafle.net/overflow/commons-go/config/probe" - "git.loafle.net/overflow/container_discovery/server" + "git.loafle.net/overflow/container_discovery/client" ) var ( - pidFilePath *string + portNumber *int ) func init() { - pidFilePath = flag.String(occp.FlagPidFilePathName, "./dist/discovery.pid", "PID file path") + portNumber = flag.Int(occp.FlagProbePortName, 0, "Port number of Probe") loggingConfigFilePath := flag.String(occp.FlagLoggingConfigFilePathName, "", "logging config path") flag.Parse() @@ -29,10 +29,10 @@ func init() { func main() { defer logging.Logger().Sync() - s := server.New(*pidFilePath) + c := client.New(*portNumber) go func() { - err := s.ListenAndServe() + err := c.Start() if nil != err { log.Printf("err: %v", err) } @@ -51,7 +51,7 @@ func main() { <-interrupt ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if err := s.Shutdown(ctx); err != nil { + if err := c.Stop(ctx); err != nil { logging.Logger().Errorf("error: %v", err) } } diff --git a/server/server-handler.go b/server/server-handler.go deleted file mode 100644 index e74bc91..0000000 --- a/server/server-handler.go +++ /dev/null @@ -1,48 +0,0 @@ -package server - -import ( - cs "git.loafle.net/commons/server-go" - ocs "git.loafle.net/overflow/container-go/server" -) - -type ServerHandler interface { - ocs.ServerHandler -} - -type ServerHandlers struct { - ocs.ServerHandlers -} - -func (sh *ServerHandlers) Init(serverCtx cs.ServerCtx) error { - if err := sh.ServerHandlers.Init(serverCtx); nil != err { - return err - } - - return nil -} - -func (sh *ServerHandlers) OnStart(serverCtx cs.ServerCtx) error { - if err := sh.ServerHandlers.OnStart(serverCtx); nil != err { - return err - } - - return nil -} - -func (sh *ServerHandlers) OnStop(serverCtx cs.ServerCtx) { - - sh.ServerHandlers.OnStop(serverCtx) -} - -func (sh *ServerHandlers) Destroy(serverCtx cs.ServerCtx) { - - sh.ServerHandlers.Destroy(serverCtx) -} - -func (sh *ServerHandlers) Validate() error { - if err := sh.ServerHandlers.Validate(); nil != err { - return err - } - - return nil -} diff --git a/server/server.go b/server/server.go deleted file mode 100644 index a095233..0000000 --- a/server/server.go +++ /dev/null @@ -1,48 +0,0 @@ -package server - -import ( - cdr "git.loafle.net/commons/di-go/registry" - logging "git.loafle.net/commons/logging-go" - crpj "git.loafle.net/commons/rpc-go/protocol/json" - crr "git.loafle.net/commons/rpc-go/registry" - cssn "git.loafle.net/commons/server-go/socket/net" - occa "git.loafle.net/overflow/commons-go/core/annotation" - "git.loafle.net/overflow/container-go" - "git.loafle.net/overflow/container_discovery/crawler" - "git.loafle.net/overflow/container_discovery/service" - "git.loafle.net/overflow/container_discovery/servlet" -) - -func New(pidFilePath string) *cssn.Server { - rpcWriteChan := make(chan *container.RPCNotification, 256) - - cdr.RegisterResource(container.CONTAINER_CRAWLERS, crawler.GetCrawlers()) - cdr.RegisterResource(container.CONTAINER_RPC_WRITE_CHAN, rpcWriteChan) - - services, err := cdr.GetInstancesByAnnotationType(occa.RPCServiceAnnotationType) - if nil != err { - logging.Logger().Panic(err) - } - - rpcRegistry := crr.NewRPCRegistry() - rpcRegistry.RegisterServices(services...) - - ds := &servlet.DiscoveryServlets{} - ds.RPCInvoker = rpcRegistry - ds.RPCWriteChan = rpcWriteChan - ds.RPCServerCodec = crpj.NewServerCodec() - - sh := &ServerHandlers{} - sh.Name = "Container Discovery" - sh.PIDFilePath = pidFilePath - sh.Services = services - sh.OrderedServices = service.OrderedServices - - sh.RegisterServlet(ds) - - s := &cssn.Server{ - ServerHandler: sh, - } - - return s -} diff --git a/servlet/discovery-servlet.go b/servlet/discovery-servlet.go deleted file mode 100644 index 7614327..0000000 --- a/servlet/discovery-servlet.go +++ /dev/null @@ -1,55 +0,0 @@ -package servlet - -import ( - "net" - - "git.loafle.net/commons/server-go" - css "git.loafle.net/commons/server-go/socket" - ocs "git.loafle.net/overflow/container-go/servlet" -) - -type DiscoveryServlet interface { - ocs.RPCServlet -} - -type DiscoveryServlets struct { - ocs.RPCServlets -} - -func (s *DiscoveryServlets) Init(serverCtx server.ServerCtx) error { - if err := s.RPCServlets.Init(serverCtx); nil != err { - return err - } - - return nil -} - -func (s *DiscoveryServlets) OnStart(serverCtx server.ServerCtx) error { - if err := s.RPCServlets.OnStart(serverCtx); nil != err { - return err - } - - return nil -} - -func (s *DiscoveryServlets) OnStop(serverCtx server.ServerCtx) { - - s.RPCServlets.OnStop(serverCtx) -} - -func (s *DiscoveryServlets) Destroy(serverCtx server.ServerCtx) { - - s.RPCServlets.Destroy(serverCtx) -} - -func (s *DiscoveryServlets) Handshake(servletCtx server.ServletCtx, conn net.Conn) error { - return nil -} - -func (s *DiscoveryServlets) OnConnect(servletCtx server.ServletCtx, conn css.Conn) { - s.RPCServlets.OnConnect(servletCtx, conn) -} - -func (s *DiscoveryServlets) OnDisconnect(servletCtx server.ServletCtx) { - s.RPCServlets.OnDisconnect(servletCtx) -}