This commit is contained in:
crusader 2018-05-02 19:59:45 +09:00
parent 7f395b6520
commit 0052da555c
8 changed files with 300 additions and 301 deletions

85
client/client-handler.go Normal file
View File

@ -0,0 +1,85 @@
package client
import (
"fmt"
"reflect"
"sync/atomic"
logging "git.loafle.net/commons/logging-go"
crc "git.loafle.net/commons/rpc-go/client"
occi "git.loafle.net/overflow/commons-go/core/interfaces"
)
type ClientHandler interface {
crc.ClientHandler
}
type ClientHandlers struct {
crc.ClientHandlers
Services []interface{}
OrderedServices []reflect.Type
validated atomic.Value
}
func (ch *ClientHandlers) Init(clientCtx crc.ClientCtx) error {
if err := ch.ClientHandlers.Init(clientCtx); nil != err {
return err
}
if err := occi.ExecServices(ch.Services, occi.ServiceMethodInit, ch.OrderedServices, false); nil != err {
return err
}
return nil
}
func (ch *ClientHandlers) OnStart(clientCtx crc.ClientCtx) error {
if err := ch.ClientHandlers.OnStart(clientCtx); nil != err {
return err
}
if err := occi.ExecServices(ch.Services, occi.ServiceMethodStart, ch.OrderedServices, false); nil != err {
return err
}
return nil
}
func (ch *ClientHandlers) OnStop(clientCtx crc.ClientCtx) {
if err := occi.ExecServices(ch.Services, occi.ServiceMethodStop, ch.OrderedServices, true); nil != err {
logging.Logger().Errorf("Container[%s]: Service stop err %v", ch.Name, err)
}
ch.ClientHandlers.OnStop(clientCtx)
}
func (ch *ClientHandlers) Destroy(clientCtx crc.ClientCtx) {
if err := occi.ExecServices(ch.Services, occi.ServiceMethodDestroy, ch.OrderedServices, true); nil != err {
logging.Logger().Errorf("Container[%s]: Service destroy err %v", ch.Name, err)
}
ch.ClientHandlers.Destroy(clientCtx)
}
func (ch *ClientHandlers) Validate() error {
if nil != ch.validated.Load() {
return nil
}
ch.validated.Store(true)
if err := ch.ClientHandlers.Validate(); nil != err {
return err
}
if nil == ch.Services {
return fmt.Errorf("Services is not valid")
}
if nil == ch.OrderedServices {
return fmt.Errorf("OrderedServices is not valid")
}
return nil
}

71
client/client.go Normal file
View File

@ -0,0 +1,71 @@
package client
import (
"fmt"
"net/url"
"path"
"reflect"
"git.loafle.net/commons/logging-go"
crc "git.loafle.net/commons/rpc-go/client"
crpj "git.loafle.net/commons/rpc-go/protocol/json"
csswc "git.loafle.net/commons/server-go/socket/web/client"
occc "git.loafle.net/overflow/commons-go/config/container"
occp "git.loafle.net/overflow/commons-go/config/probe"
)
func New(containerType occp.ContainerType, services []interface{}, orderedServices []reflect.Type, portNumber int) (*crc.Client, error) {
connector, err := newConnector(containerType.String(), portNumber)
if nil != err {
return nil, err
}
codec := crpj.NewClientCodec()
var rpcRegistry crr.RPCRegistry
if nil != services && 0 < len(services) {
rpcRegistry = crr.NewRPCRegistry()
rpcRegistry.RegisterServices(services...)
}
ch := &ClientHandlers{
Services: services,
OrderedServices: orderedServices,
}
ch.Name = containerType.String()
ch.Connector = connector
ch.RPCCodec = codec
ch.RPCInvoker = rpcRegistry
return nil, &crc.Client{
ClientHandler: ch,
}
}
func newConnector(containerType occp.ContainerType, portNumber int) (*csswc.Connectors, error) {
u := url.URL{
Scheme: "ws",
Host: fmt.Sprintf("127.0.0.1:%d", port),
}
u.Path = path.Join(u.Path, occc.HTTPEntry_Container)
connector := &csswc.Connectors{
Name: containerType.String(),
URL: u.String(),
}
connector.ReconnectInterval = 5
connector.ReconnectTryTime = 2
connector.MaxMessageSize = 4096
connector.ReadBufferSize = 4096
connector.WriteBufferSize = 4096
connector.PongTimeout = 60
connector.PingTimeout = 10
connector.PingPeriod = 9
connector.OnDisconnected = func(connector csc.Connector) {
logging.Logger().Debugf("Client[%s] has been disconnected", connector.GetName())
}
return connector, nil
}

View File

@ -1,119 +0,0 @@
package server
import (
"fmt"
"io/ioutil"
"net"
"os"
"reflect"
"strconv"
logging "git.loafle.net/commons/logging-go"
"git.loafle.net/commons/server-go"
cssn "git.loafle.net/commons/server-go/socket/net"
occi "git.loafle.net/overflow/commons-go/core/interfaces"
)
type ServerHandler interface {
cssn.ServerHandler
}
type ServerHandlers struct {
cssn.ServerHandlers
PIDFilePath string
Services []interface{}
OrderedServices []reflect.Type
port int
}
func (sh *ServerHandlers) Init(serverCtx server.ServerCtx) error {
if err := sh.ServerHandlers.Init(serverCtx); nil != err {
return err
}
if err := occi.ExecServices(sh.Services, occi.ServiceMethodInit, sh.OrderedServices, false); nil != err {
return err
}
return nil
}
func (sh *ServerHandlers) OnStart(serverCtx server.ServerCtx) error {
if err := sh.ServerHandlers.OnStart(serverCtx); nil != err {
return err
}
if err := occi.ExecServices(sh.Services, occi.ServiceMethodStart, sh.OrderedServices, false); nil != err {
return err
}
if _, err := os.Stat(sh.PIDFilePath); os.IsExist(err) {
if err := os.Remove(sh.PIDFilePath); nil != err {
logging.Logger().Errorf("Container[%s]: Removing pid file has been failed [%v]", sh.Name, err)
}
}
s := strconv.FormatInt(int64(sh.port), 10)
if err := ioutil.WriteFile(sh.PIDFilePath, []byte(s), os.ModePerm); nil != err {
return err
}
return nil
}
func (sh *ServerHandlers) OnStop(serverCtx server.ServerCtx) {
if err := occi.ExecServices(sh.Services, occi.ServiceMethodStop, sh.OrderedServices, true); nil != err {
logging.Logger().Errorf("Container[%s]: Service stop err %v", sh.Name, err)
}
if _, err := os.Stat(sh.PIDFilePath); os.IsExist(err) {
if err := os.Remove(sh.PIDFilePath); nil != err {
logging.Logger().Errorf("Container: Removing pid file has been failed [%v]", err)
}
}
sh.ServerHandlers.OnStop(serverCtx)
}
func (sh *ServerHandlers) Destroy(serverCtx server.ServerCtx) {
if err := occi.ExecServices(sh.Services, occi.ServiceMethodDestroy, sh.OrderedServices, true); nil != err {
logging.Logger().Errorf("Container[%s]: Service destroy err %v", sh.Name, err)
}
sh.ServerHandlers.Destroy(serverCtx)
}
func (sh *ServerHandlers) Listener(serverCtx server.ServerCtx) (net.Listener, error) {
for i := 60000; i < 61000; i++ {
addr := fmt.Sprintf("localhost:%d", i)
l, err := net.Listen("tcp", addr)
if nil == err {
sh.port = i
return l, nil
}
}
return nil, fmt.Errorf("Container[%s]: Cannot find availrable port", sh.Name)
}
func (sh *ServerHandlers) Validate() error {
if err := sh.ServerHandlers.Validate(); nil != err {
return err
}
if "" == sh.PIDFilePath {
return fmt.Errorf("Container[%s]: The path of pid file must be specified", sh.Name)
}
if nil == sh.Services {
return fmt.Errorf("Container[%s]: Services must be specified", sh.Name)
}
if nil == sh.OrderedServices {
return fmt.Errorf("Container[%s]: OrderedServices must be specified", sh.Name)
}
return nil
}

View File

@ -0,0 +1,97 @@
package service
import (
"fmt"
"reflect"
"strconv"
"time"
cda "git.loafle.net/commons/di-go/annotation"
cdr "git.loafle.net/commons/di-go/registry"
logging "git.loafle.net/commons/logging-go"
cuts "git.loafle.net/commons/util-go/time/scheduler"
cutss "git.loafle.net/commons/util-go/time/scheduler/storage"
ocmsc "git.loafle.net/overflow/commons-go/model/sensorconfig"
ocsp "git.loafle.net/overflow/commons-go/service/probe"
// For annotation
_ "git.loafle.net/overflow/commons-go/core/annotation"
)
var CollectorServiceType = reflect.TypeOf((*CollectorService)(nil))
func init() {
cdr.RegisterType(CollectorServiceType)
}
type CollectorService struct {
ocsp.CollectorService
cda.TypeAnnotation `annotation:"@overflow:RPCService()"`
ProbeClientService *ProbeClientService `annotation:"@Inject()"`
CrawlerService *CrawlerService `annotation:"@Inject()"`
SensorConfigService *SensorConfigService `annotation:"@Inject()"`
scheduler *cuts.Scheduler
}
func (s *CollectorService) InitService() error {
_storage := cutss.NewMemoryStorage()
s.scheduler = cuts.New(_storage)
return nil
}
func (s *CollectorService) StartService() error {
if err := s.scheduler.Start(); nil != err {
return err
}
if err := s.addScheduleAll(); nil != err {
return err
}
return nil
}
func (s *CollectorService) StopService() {
s.scheduler.Stop()
}
func (s *CollectorService) DestroyService() {
}
func (s *CollectorService) addScheduleAll() error {
sensorConfigs := s.SensorConfigService.sensorConfigs
if nil == sensorConfigs || 0 == len(sensorConfigs) {
return nil
}
for _, sensorConfig := range sensorConfigs {
interval, err := strconv.ParseInt(sensorConfig.Schedule.Interval, 10, 64)
if nil != err {
return fmt.Errorf("Cannot convert interval[%s] %v", sensorConfig.Schedule.Interval, err)
}
s.addSchedule(interval, sensorConfig)
}
return nil
}
func (s *CollectorService) addSchedule(interval int64, sensorConfig *ocmsc.SensorConfig) {
s.scheduler.RunEvery(time.Duration(interval)*time.Second, s.collectTask, sensorConfig)
}
func (s *CollectorService) collectTask(sensorConfig *ocmsc.SensorConfig) {
logging.Logger().Debugf("CollectorService.collectTask for sensor config id[%s] of crawler[%s]", sensorConfig.ConfigID, sensorConfig.Crawler.Name)
result, err := s.CrawlerService.Get(sensorConfig.ConfigID)
if nil != err {
logging.Logger().Errorf("Cannot get data from crawler[%s] %v", sensorConfig.Crawler.Name, err)
return
}
if err := s.ProbeClientService.Send("DataService.Metric", result); nil != err {
logging.Logger().Errorf("Cannot send data from config id[%s] of crawler[%s] %v", sensorConfig.ConfigID, sensorConfig.Crawler.Name, err)
}
}

View File

@ -0,0 +1,45 @@
package service
import (
"reflect"
cda "git.loafle.net/commons/di-go/annotation"
cdr "git.loafle.net/commons/di-go/registry"
crc "git.loafle.net/commons/rpc-go/client"
// For annotation
_ "git.loafle.net/overflow/commons-go/core/annotation"
)
var ProbeClientServiceType = reflect.TypeOf((*ProbeClientService)(nil))
func init() {
cdr.RegisterType(ProbeClientServiceType)
}
type ProbeClientService struct {
cda.TypeAnnotation `annotation:"@overflow:RPCService()"`
Client *crc.Client `annotation:"@Resource(name='CONTAINER_CLIENT')"`
}
func (s *ProbeClientService) InitService() error {
return nil
}
func (s *ProbeClientService) StartService() error {
return nil
}
func (s *ProbeClientService) StopService() {
}
func (s *ProbeClientService) DestroyService() {
}
func (s *ProbeClientService) Send(method string, params ...interface{}) error {
return s.Client.Send(method, params...)
}

View File

@ -1,52 +0,0 @@
package service
import (
"fmt"
"reflect"
cda "git.loafle.net/commons/di-go/annotation"
cdr "git.loafle.net/commons/di-go/registry"
"git.loafle.net/overflow/container-go"
// For annotation
_ "git.loafle.net/overflow/commons-go/core/annotation"
)
var ProbeServiceType = reflect.TypeOf((*ProbeService)(nil))
func init() {
cdr.RegisterType(ProbeServiceType)
}
type ProbeService struct {
cda.TypeAnnotation `annotation:"@overflow:RPCService()"`
RPCWriteChan chan<- *container.RPCNotification `annotation:"@Resource(name='CONTAINER_RPC_WRITE_CHAN')"`
}
func (s *ProbeService) InitService() error {
return nil
}
func (s *ProbeService) StartService() error {
return nil
}
func (s *ProbeService) StopService() {
}
func (s *ProbeService) DestroyService() {
}
func (s *ProbeService) Send(method string, params ...interface{}) error {
select {
case s.RPCWriteChan <- &container.RPCNotification{Method: method, Params: params}:
default:
return fmt.Errorf("cannot write to rpcWriteChan")
}
return nil
}

View File

@ -4,9 +4,10 @@ import "reflect"
var (
OrderedServices = []reflect.Type{
ProbeServiceType,
ProbeClientServiceType,
SensorConfigServiceType,
CrawlerServiceType,
CollectorServiceType,
}
)

View File

@ -1,129 +0,0 @@
package servlet
import (
"net"
"git.loafle.net/commons/logging-go"
crp "git.loafle.net/commons/rpc-go/protocol"
crr "git.loafle.net/commons/rpc-go/registry"
"git.loafle.net/commons/server-go"
css "git.loafle.net/commons/server-go/socket"
cssn "git.loafle.net/commons/server-go/socket/net"
"git.loafle.net/overflow/container-go"
)
type RPCServlet interface {
cssn.Servlet
}
type RPCServlets struct {
cssn.Servlets
RPCServerCodec crp.ServerCodec
RPCInvoker crr.RPCInvoker
RPCWriteChan <-chan *container.RPCNotification
}
func (s *RPCServlets) Handshake(servletCtx server.ServletCtx, conn net.Conn) error {
return nil
}
func (s *RPCServlets) OnConnect(servletCtx server.ServletCtx, conn css.Conn) {
s.Servlets.OnConnect(servletCtx, conn)
}
func (s *RPCServlets) OnDisconnect(servletCtx server.ServletCtx) {
s.Servlets.OnDisconnect(servletCtx)
}
func (s *RPCServlets) Handle(servletCtx server.ServletCtx,
stopChan <-chan struct{}, doneChan chan<- struct{},
readChan <-chan []byte, writeChan chan<- []byte) {
defer func() {
doneChan <- struct{}{}
}()
var (
src crp.ServerRequestCodec
reply interface{}
replyBuff []byte
err error
)
go s.handleRPCWrite(stopChan, writeChan)
for {
select {
case msg, ok := <-readChan:
if !ok {
return
}
// grpc exec method call
src, err = s.RPCServerCodec.NewRequest(msg)
if nil != err {
logging.Logger().Error(err)
break
}
reply, err = s.RPCInvoker.Invoke(src)
if !src.HasResponse() {
break
}
replyBuff, err = src.NewResponse(reply, err)
if nil != err {
logging.Logger().Error(err)
s.writeError(src, writeChan, crp.E_INTERNAL, "", err)
break
}
writeChan <- replyBuff
case <-stopChan:
return
}
}
}
func (s *RPCServlets) handleRPCWrite(stopChan <-chan struct{}, writeChan chan<- []byte) {
var (
buf []byte
err error
)
for {
select {
case noti, ok := <-s.RPCWriteChan:
if !ok {
break
}
buf, err = s.RPCServerCodec.NewNotification(noti.Method, noti.Params)
if nil != err {
logging.Logger().Error(err)
break
}
writeChan <- buf
case <-stopChan:
return
}
}
}
func (s *RPCServlets) writeError(src crp.ServerRequestCodec, writeChan chan<- []byte, code crp.ErrorCode, message string, data interface{}) {
if !src.HasResponse() {
return
}
pErr := &crp.Error{
Code: code,
Message: message,
Data: data,
}
buf, err := src.NewResponse(nil, pErr)
if nil != err {
logging.Logger().Error(err)
return
}
writeChan <- buf
}