diff --git a/client/client-handler.go b/client/client-handler.go new file mode 100644 index 0000000..3c035b4 --- /dev/null +++ b/client/client-handler.go @@ -0,0 +1,85 @@ +package client + +import ( + "fmt" + "reflect" + "sync/atomic" + + logging "git.loafle.net/commons/logging-go" + crc "git.loafle.net/commons/rpc-go/client" + occi "git.loafle.net/overflow/commons-go/core/interfaces" +) + +type ClientHandler interface { + crc.ClientHandler +} + +type ClientHandlers struct { + crc.ClientHandlers + + Services []interface{} + OrderedServices []reflect.Type + + validated atomic.Value +} + +func (ch *ClientHandlers) Init(clientCtx crc.ClientCtx) error { + if err := ch.ClientHandlers.Init(clientCtx); nil != err { + return err + } + + if err := occi.ExecServices(ch.Services, occi.ServiceMethodInit, ch.OrderedServices, false); nil != err { + return err + } + + return nil +} + +func (ch *ClientHandlers) OnStart(clientCtx crc.ClientCtx) error { + if err := ch.ClientHandlers.OnStart(clientCtx); nil != err { + return err + } + + if err := occi.ExecServices(ch.Services, occi.ServiceMethodStart, ch.OrderedServices, false); nil != err { + return err + } + + return nil +} + +func (ch *ClientHandlers) OnStop(clientCtx crc.ClientCtx) { + if err := occi.ExecServices(ch.Services, occi.ServiceMethodStop, ch.OrderedServices, true); nil != err { + logging.Logger().Errorf("Container[%s]: Service stop err %v", ch.Name, err) + } + + ch.ClientHandlers.OnStop(clientCtx) +} + +func (ch *ClientHandlers) Destroy(clientCtx crc.ClientCtx) { + if err := occi.ExecServices(ch.Services, occi.ServiceMethodDestroy, ch.OrderedServices, true); nil != err { + logging.Logger().Errorf("Container[%s]: Service destroy err %v", ch.Name, err) + } + + ch.ClientHandlers.Destroy(clientCtx) +} + +func (ch *ClientHandlers) Validate() error { + if nil != ch.validated.Load() { + return nil + } + ch.validated.Store(true) + + if err := ch.ClientHandlers.Validate(); nil != err { + return err + } + + if nil == ch.Services { + return fmt.Errorf("Services is not valid") + } + + if nil == ch.OrderedServices { + return fmt.Errorf("OrderedServices is not valid") + } + + return nil +} diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..99f1463 --- /dev/null +++ b/client/client.go @@ -0,0 +1,71 @@ +package client + +import ( + "fmt" + "net/url" + "path" + "reflect" + + "git.loafle.net/commons/logging-go" + crc "git.loafle.net/commons/rpc-go/client" + crpj "git.loafle.net/commons/rpc-go/protocol/json" + csswc "git.loafle.net/commons/server-go/socket/web/client" + occc "git.loafle.net/overflow/commons-go/config/container" + occp "git.loafle.net/overflow/commons-go/config/probe" +) + +func New(containerType occp.ContainerType, services []interface{}, orderedServices []reflect.Type, portNumber int) (*crc.Client, error) { + connector, err := newConnector(containerType.String(), portNumber) + if nil != err { + return nil, err + } + + codec := crpj.NewClientCodec() + + var rpcRegistry crr.RPCRegistry + if nil != services && 0 < len(services) { + rpcRegistry = crr.NewRPCRegistry() + rpcRegistry.RegisterServices(services...) + } + + ch := &ClientHandlers{ + Services: services, + OrderedServices: orderedServices, + } + ch.Name = containerType.String() + ch.Connector = connector + ch.RPCCodec = codec + ch.RPCInvoker = rpcRegistry + + return nil, &crc.Client{ + ClientHandler: ch, + } +} + +func newConnector(containerType occp.ContainerType, portNumber int) (*csswc.Connectors, error) { + u := url.URL{ + Scheme: "ws", + Host: fmt.Sprintf("127.0.0.1:%d", port), + } + u.Path = path.Join(u.Path, occc.HTTPEntry_Container) + + connector := &csswc.Connectors{ + Name: containerType.String(), + URL: u.String(), + } + + connector.ReconnectInterval = 5 + connector.ReconnectTryTime = 2 + connector.MaxMessageSize = 4096 + connector.ReadBufferSize = 4096 + connector.WriteBufferSize = 4096 + connector.PongTimeout = 60 + connector.PingTimeout = 10 + connector.PingPeriod = 9 + + connector.OnDisconnected = func(connector csc.Connector) { + logging.Logger().Debugf("Client[%s] has been disconnected", connector.GetName()) + } + + return connector, nil +} diff --git a/server/server-handler.go b/server/server-handler.go deleted file mode 100644 index 973982a..0000000 --- a/server/server-handler.go +++ /dev/null @@ -1,119 +0,0 @@ -package server - -import ( - "fmt" - "io/ioutil" - "net" - "os" - "reflect" - "strconv" - - logging "git.loafle.net/commons/logging-go" - "git.loafle.net/commons/server-go" - cssn "git.loafle.net/commons/server-go/socket/net" - occi "git.loafle.net/overflow/commons-go/core/interfaces" -) - -type ServerHandler interface { - cssn.ServerHandler -} - -type ServerHandlers struct { - cssn.ServerHandlers - - PIDFilePath string - Services []interface{} - OrderedServices []reflect.Type - - port int -} - -func (sh *ServerHandlers) Init(serverCtx server.ServerCtx) error { - if err := sh.ServerHandlers.Init(serverCtx); nil != err { - return err - } - - if err := occi.ExecServices(sh.Services, occi.ServiceMethodInit, sh.OrderedServices, false); nil != err { - return err - } - - return nil -} - -func (sh *ServerHandlers) OnStart(serverCtx server.ServerCtx) error { - if err := sh.ServerHandlers.OnStart(serverCtx); nil != err { - return err - } - - if err := occi.ExecServices(sh.Services, occi.ServiceMethodStart, sh.OrderedServices, false); nil != err { - return err - } - - if _, err := os.Stat(sh.PIDFilePath); os.IsExist(err) { - if err := os.Remove(sh.PIDFilePath); nil != err { - logging.Logger().Errorf("Container[%s]: Removing pid file has been failed [%v]", sh.Name, err) - } - } - - s := strconv.FormatInt(int64(sh.port), 10) - if err := ioutil.WriteFile(sh.PIDFilePath, []byte(s), os.ModePerm); nil != err { - return err - } - - return nil -} - -func (sh *ServerHandlers) OnStop(serverCtx server.ServerCtx) { - if err := occi.ExecServices(sh.Services, occi.ServiceMethodStop, sh.OrderedServices, true); nil != err { - logging.Logger().Errorf("Container[%s]: Service stop err %v", sh.Name, err) - } - - if _, err := os.Stat(sh.PIDFilePath); os.IsExist(err) { - if err := os.Remove(sh.PIDFilePath); nil != err { - logging.Logger().Errorf("Container: Removing pid file has been failed [%v]", err) - } - } - - sh.ServerHandlers.OnStop(serverCtx) -} - -func (sh *ServerHandlers) Destroy(serverCtx server.ServerCtx) { - if err := occi.ExecServices(sh.Services, occi.ServiceMethodDestroy, sh.OrderedServices, true); nil != err { - logging.Logger().Errorf("Container[%s]: Service destroy err %v", sh.Name, err) - } - - sh.ServerHandlers.Destroy(serverCtx) -} - -func (sh *ServerHandlers) Listener(serverCtx server.ServerCtx) (net.Listener, error) { - for i := 60000; i < 61000; i++ { - addr := fmt.Sprintf("localhost:%d", i) - l, err := net.Listen("tcp", addr) - if nil == err { - sh.port = i - return l, nil - } - } - - return nil, fmt.Errorf("Container[%s]: Cannot find availrable port", sh.Name) -} - -func (sh *ServerHandlers) Validate() error { - if err := sh.ServerHandlers.Validate(); nil != err { - return err - } - - if "" == sh.PIDFilePath { - return fmt.Errorf("Container[%s]: The path of pid file must be specified", sh.Name) - } - - if nil == sh.Services { - return fmt.Errorf("Container[%s]: Services must be specified", sh.Name) - } - - if nil == sh.OrderedServices { - return fmt.Errorf("Container[%s]: OrderedServices must be specified", sh.Name) - } - - return nil -} diff --git a/service/CollectorService.go b/service/CollectorService.go new file mode 100644 index 0000000..b06874e --- /dev/null +++ b/service/CollectorService.go @@ -0,0 +1,97 @@ +package service + +import ( + "fmt" + "reflect" + "strconv" + "time" + + cda "git.loafle.net/commons/di-go/annotation" + cdr "git.loafle.net/commons/di-go/registry" + logging "git.loafle.net/commons/logging-go" + cuts "git.loafle.net/commons/util-go/time/scheduler" + cutss "git.loafle.net/commons/util-go/time/scheduler/storage" + ocmsc "git.loafle.net/overflow/commons-go/model/sensorconfig" + ocsp "git.loafle.net/overflow/commons-go/service/probe" + + // For annotation + _ "git.loafle.net/overflow/commons-go/core/annotation" +) + +var CollectorServiceType = reflect.TypeOf((*CollectorService)(nil)) + +func init() { + cdr.RegisterType(CollectorServiceType) +} + +type CollectorService struct { + ocsp.CollectorService + cda.TypeAnnotation `annotation:"@overflow:RPCService()"` + + ProbeClientService *ProbeClientService `annotation:"@Inject()"` + CrawlerService *CrawlerService `annotation:"@Inject()"` + SensorConfigService *SensorConfigService `annotation:"@Inject()"` + + scheduler *cuts.Scheduler +} + +func (s *CollectorService) InitService() error { + _storage := cutss.NewMemoryStorage() + s.scheduler = cuts.New(_storage) + + return nil +} + +func (s *CollectorService) StartService() error { + if err := s.scheduler.Start(); nil != err { + return err + } + + if err := s.addScheduleAll(); nil != err { + return err + } + return nil +} + +func (s *CollectorService) StopService() { + s.scheduler.Stop() +} + +func (s *CollectorService) DestroyService() { + +} + +func (s *CollectorService) addScheduleAll() error { + sensorConfigs := s.SensorConfigService.sensorConfigs + if nil == sensorConfigs || 0 == len(sensorConfigs) { + return nil + } + + for _, sensorConfig := range sensorConfigs { + interval, err := strconv.ParseInt(sensorConfig.Schedule.Interval, 10, 64) + if nil != err { + return fmt.Errorf("Cannot convert interval[%s] %v", sensorConfig.Schedule.Interval, err) + } + s.addSchedule(interval, sensorConfig) + } + + return nil +} + +func (s *CollectorService) addSchedule(interval int64, sensorConfig *ocmsc.SensorConfig) { + s.scheduler.RunEvery(time.Duration(interval)*time.Second, s.collectTask, sensorConfig) +} + +func (s *CollectorService) collectTask(sensorConfig *ocmsc.SensorConfig) { + logging.Logger().Debugf("CollectorService.collectTask for sensor config id[%s] of crawler[%s]", sensorConfig.ConfigID, sensorConfig.Crawler.Name) + + result, err := s.CrawlerService.Get(sensorConfig.ConfigID) + if nil != err { + logging.Logger().Errorf("Cannot get data from crawler[%s] %v", sensorConfig.Crawler.Name, err) + return + } + + if err := s.ProbeClientService.Send("DataService.Metric", result); nil != err { + logging.Logger().Errorf("Cannot send data from config id[%s] of crawler[%s] %v", sensorConfig.ConfigID, sensorConfig.Crawler.Name, err) + } +} diff --git a/service/ProbeClientService.go b/service/ProbeClientService.go new file mode 100644 index 0000000..45c9cdb --- /dev/null +++ b/service/ProbeClientService.go @@ -0,0 +1,45 @@ +package service + +import ( + "reflect" + + cda "git.loafle.net/commons/di-go/annotation" + cdr "git.loafle.net/commons/di-go/registry" + crc "git.loafle.net/commons/rpc-go/client" + + // For annotation + _ "git.loafle.net/overflow/commons-go/core/annotation" +) + +var ProbeClientServiceType = reflect.TypeOf((*ProbeClientService)(nil)) + +func init() { + cdr.RegisterType(ProbeClientServiceType) +} + +type ProbeClientService struct { + cda.TypeAnnotation `annotation:"@overflow:RPCService()"` + + Client *crc.Client `annotation:"@Resource(name='CONTAINER_CLIENT')"` +} + +func (s *ProbeClientService) InitService() error { + return nil +} + +func (s *ProbeClientService) StartService() error { + + return nil +} + +func (s *ProbeClientService) StopService() { + +} + +func (s *ProbeClientService) DestroyService() { + +} + +func (s *ProbeClientService) Send(method string, params ...interface{}) error { + return s.Client.Send(method, params...) +} diff --git a/service/ProbeService.go b/service/ProbeService.go deleted file mode 100644 index c6d2c86..0000000 --- a/service/ProbeService.go +++ /dev/null @@ -1,52 +0,0 @@ -package service - -import ( - "fmt" - "reflect" - - cda "git.loafle.net/commons/di-go/annotation" - cdr "git.loafle.net/commons/di-go/registry" - "git.loafle.net/overflow/container-go" - - // For annotation - _ "git.loafle.net/overflow/commons-go/core/annotation" -) - -var ProbeServiceType = reflect.TypeOf((*ProbeService)(nil)) - -func init() { - cdr.RegisterType(ProbeServiceType) -} - -type ProbeService struct { - cda.TypeAnnotation `annotation:"@overflow:RPCService()"` - - RPCWriteChan chan<- *container.RPCNotification `annotation:"@Resource(name='CONTAINER_RPC_WRITE_CHAN')"` -} - -func (s *ProbeService) InitService() error { - return nil -} - -func (s *ProbeService) StartService() error { - - return nil -} - -func (s *ProbeService) StopService() { - -} - -func (s *ProbeService) DestroyService() { - -} - -func (s *ProbeService) Send(method string, params ...interface{}) error { - select { - case s.RPCWriteChan <- &container.RPCNotification{Method: method, Params: params}: - default: - return fmt.Errorf("cannot write to rpcWriteChan") - } - - return nil -} diff --git a/service/service.go b/service/service.go index bb49d6a..2f8c342 100644 --- a/service/service.go +++ b/service/service.go @@ -4,9 +4,10 @@ import "reflect" var ( OrderedServices = []reflect.Type{ - ProbeServiceType, + ProbeClientServiceType, SensorConfigServiceType, CrawlerServiceType, + CollectorServiceType, } ) diff --git a/servlet/rpc-servlet.go b/servlet/rpc-servlet.go deleted file mode 100644 index c6ef0eb..0000000 --- a/servlet/rpc-servlet.go +++ /dev/null @@ -1,129 +0,0 @@ -package servlet - -import ( - "net" - - "git.loafle.net/commons/logging-go" - crp "git.loafle.net/commons/rpc-go/protocol" - crr "git.loafle.net/commons/rpc-go/registry" - "git.loafle.net/commons/server-go" - css "git.loafle.net/commons/server-go/socket" - cssn "git.loafle.net/commons/server-go/socket/net" - "git.loafle.net/overflow/container-go" -) - -type RPCServlet interface { - cssn.Servlet -} - -type RPCServlets struct { - cssn.Servlets - - RPCServerCodec crp.ServerCodec - RPCInvoker crr.RPCInvoker - RPCWriteChan <-chan *container.RPCNotification -} - -func (s *RPCServlets) Handshake(servletCtx server.ServletCtx, conn net.Conn) error { - return nil -} - -func (s *RPCServlets) OnConnect(servletCtx server.ServletCtx, conn css.Conn) { - s.Servlets.OnConnect(servletCtx, conn) - -} - -func (s *RPCServlets) OnDisconnect(servletCtx server.ServletCtx) { - s.Servlets.OnDisconnect(servletCtx) -} - -func (s *RPCServlets) Handle(servletCtx server.ServletCtx, - stopChan <-chan struct{}, doneChan chan<- struct{}, - readChan <-chan []byte, writeChan chan<- []byte) { - defer func() { - doneChan <- struct{}{} - }() - - var ( - src crp.ServerRequestCodec - reply interface{} - replyBuff []byte - err error - ) - - go s.handleRPCWrite(stopChan, writeChan) - - for { - select { - case msg, ok := <-readChan: - if !ok { - return - } - // grpc exec method call - src, err = s.RPCServerCodec.NewRequest(msg) - if nil != err { - logging.Logger().Error(err) - break - } - - reply, err = s.RPCInvoker.Invoke(src) - if !src.HasResponse() { - break - } - replyBuff, err = src.NewResponse(reply, err) - if nil != err { - logging.Logger().Error(err) - s.writeError(src, writeChan, crp.E_INTERNAL, "", err) - break - } - - writeChan <- replyBuff - case <-stopChan: - return - } - } -} - -func (s *RPCServlets) handleRPCWrite(stopChan <-chan struct{}, writeChan chan<- []byte) { - var ( - buf []byte - err error - ) - for { - select { - case noti, ok := <-s.RPCWriteChan: - if !ok { - break - } - - buf, err = s.RPCServerCodec.NewNotification(noti.Method, noti.Params) - if nil != err { - logging.Logger().Error(err) - break - } - - writeChan <- buf - case <-stopChan: - return - } - } -} - -func (s *RPCServlets) writeError(src crp.ServerRequestCodec, writeChan chan<- []byte, code crp.ErrorCode, message string, data interface{}) { - if !src.HasResponse() { - return - } - - pErr := &crp.Error{ - Code: code, - Message: message, - Data: data, - } - - buf, err := src.NewResponse(nil, pErr) - if nil != err { - logging.Logger().Error(err) - return - } - writeChan <- buf -}