This commit is contained in:
geek 2018-07-02 17:44:53 +09:00
parent c4797016b9
commit d8d6198c44
14 changed files with 115 additions and 92 deletions

1
.gitignore vendored
View File

@ -66,3 +66,4 @@ glide.lock
.DS_Store .DS_Store
dist/ dist/
debug debug
Gopkg.lock

9
Gopkg.lock generated
View File

@ -135,22 +135,13 @@
[[projects]] [[projects]]
name = "github.com/shirou/gopsutil" name = "github.com/shirou/gopsutil"
packages = [ packages = [
"cpu",
"host", "host",
"internal/common", "internal/common",
"mem",
"net",
"process" "process"
] ]
revision = "4a180b209f5f494e5923cfce81ea30ba23915877" revision = "4a180b209f5f494e5923cfce81ea30ba23915877"
version = "v2.18.06" version = "v2.18.06"
[[projects]]
branch = "master"
name = "github.com/shirou/w32"
packages = ["."]
revision = "bb4de0191aa41b5507caa14b0650cdbddcd9280b"
[[projects]] [[projects]]
name = "github.com/valyala/fasthttp" name = "github.com/valyala/fasthttp"
packages = [ packages = [

View File

@ -1,4 +0,0 @@
{
"tempKey": "e5f6a72149dc11e890a40242ac120004",
"acceptedDate": 1524807456000
}

View File

@ -4,7 +4,7 @@
"apiKey": "52abd6fd57e511e7ac52080027658d13" "apiKey": "52abd6fd57e511e7ac52080027658d13"
}, },
"central": { "central": {
"address": "192.168.1.101:19100", "address": "192.168.1.103:19100",
"connector": { "connector": {
"reconnectInterval": 5, "reconnectInterval": 5,
"reconnectTryTime": 10, "reconnectTryTime": 10,

View File

@ -1,39 +0,0 @@
{
"configID" : "989238744",
"target" : {
"connection" : {
"ip" : "192.168.1.50",
"port" : "5432",
"ssl" : false,
"portType" : "tcp"
},
"auth" : {
"url":"jdbc:postgresql://192.168.1.50:5432/overflow",
"id":"overflow",
"pw":"qwer5795"
},
"meta": {
}
},
"schedule" : {
"interval" : "3"
},
"crawler" : {
"name":"POSTGRESQL",
"container":"GENERAL"
},
"items" : [
{
"keys" : [
{
"metric":"net.pgsql.connection_count",
"key" : "connection_count"
}
],
"queryInfo":{
"query" : "select count(pid) as connection_count from pg_catalog.pg_stat_activity where state <> 'idle';"
},
"mappingInfo" : {}
}
]
}

0
_build/config_/_ Normal file
View File

View File

@ -11,5 +11,5 @@ func SensorConfigContainerDir(name string) string {
} }
func SensorConfigFilePath(sensorConfig *ocmsc.SensorConfig) string { func SensorConfigFilePath(sensorConfig *ocmsc.SensorConfig) string {
return path.Join(SensorConfigContainerDir(sensorConfig.Crawler.Container), sensorConfig.Crawler.Name, sensorConfig.ID.String()) return path.Join(SensorConfigContainerDir(sensorConfig.Crawler.MetaCrawlerContainerKey), sensorConfig.SensorID.String())
} }

View File

@ -3,7 +3,6 @@ package service
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"strconv"
"time" "time"
cda "git.loafle.net/commons/di-go/annotation" cda "git.loafle.net/commons/di-go/annotation"
@ -65,13 +64,13 @@ func (s *CollectorService) addScheduleAll() error {
return nil return nil
} }
for _, sensorConfig := range sensorConfigs { //for _, sensorConfig := range sensorConfigs {
interval, err := strconv.ParseInt(sensorConfig.Schedule.Interval, 10, 64) // interval, err := strconv.ParseInt(sensorConfig.Schedule.Interval, 10, 64)
if nil != err { // if nil != err {
return fmt.Errorf("Cannot convert interval[%s] %v", sensorConfig.Schedule.Interval, err) // return fmt.Errorf("Cannot convert interval[%s] %v", sensorConfig.Schedule.Interval, err)
} // }
s.addSchedule(interval, sensorConfig) // s.addSchedule(interval, sensorConfig)
} //}
return nil return nil
} }

View File

@ -49,14 +49,14 @@ func (s *MetricService) StartService() error {
return fmt.Errorf("MetricService: StartService failed %v", err) return fmt.Errorf("MetricService: StartService failed %v", err)
} }
s.connector = connector //s.connector = connector
//
readChan, writeChan, err := s.connector.Connect() //readChan, writeChan, err := s.connector.Connect()
if nil != err { //if nil != err {
return fmt.Errorf("MetricService: StartService failed %v", err) // return fmt.Errorf("MetricService: StartService failed %v", err)
} //}
s.readChan = readChan //s.readChan = readChan
s.writeChan = writeChan //s.writeChan = writeChan
return nil return nil
} }

View File

@ -7,10 +7,11 @@ import (
"os" "os"
"path" "path"
"reflect" "reflect"
"compress/gzip"
cda "git.loafle.net/commons/di-go/annotation" cda "git.loafle.net/commons/di-go/annotation"
cdr "git.loafle.net/commons/di-go/registry" cdr "git.loafle.net/commons/di-go/registry"
logging "git.loafle.net/commons/logging-go" "git.loafle.net/commons/logging-go"
occp "git.loafle.net/overflow/commons-go/config/probe" occp "git.loafle.net/overflow/commons-go/config/probe"
ocmsc "git.loafle.net/overflow/commons-go/model/sensorconfig" ocmsc "git.loafle.net/overflow/commons-go/model/sensorconfig"
ocsp "git.loafle.net/overflow/commons-go/service/probe" ocsp "git.loafle.net/overflow/commons-go/service/probe"
@ -18,6 +19,9 @@ import (
// For annotation // For annotation
_ "git.loafle.net/overflow/commons-go/core/annotation" _ "git.loafle.net/overflow/commons-go/core/annotation"
"crypto/cipher"
"crypto/des"
"bytes"
) )
var SensorConfigServiceType = reflect.TypeOf((*SensorConfigService)(nil)) var SensorConfigServiceType = reflect.TypeOf((*SensorConfigService)(nil))
@ -31,6 +35,7 @@ type SensorConfigService struct {
cda.TypeAnnotation `annotation:"@overflow:RPCService()"` cda.TypeAnnotation `annotation:"@overflow:RPCService()"`
ContainerService *ContainerService `annotation:"@Inject()"` ContainerService *ContainerService `annotation:"@Inject()"`
ProbeClientService *ProbeClientService `annotation:"@Inject()"`
sensorConfigs map[string]*ocmsc.SensorConfig sensorConfigs map[string]*ocmsc.SensorConfig
sensorConfigsPerContainer map[occp.ContainerType][]*ocmsc.SensorConfig sensorConfigsPerContainer map[occp.ContainerType][]*ocmsc.SensorConfig
@ -80,7 +85,9 @@ func (s *SensorConfigService) SendInitConfig(containerType occp.ContainerType) e
} }
func (s *SensorConfigService) AddConfig(tempFilePath string) error { func (s *SensorConfigService) AddConfig(tempFilePath string) error {
sc, buf, err := s.loadConfigFile(tempFilePath) //sc, buf, err := s.loadConfigFile(tempFilePath)
sc, buf, err := s.loadConfigFile1(tempFilePath)
if nil != err { if nil != err {
return err return err
} }
@ -94,12 +101,12 @@ func (s *SensorConfigService) AddConfig(tempFilePath string) error {
return err return err
} }
s.sensorConfigs[sc.ID.String()] = sc //s.sensorConfigs[sc.ID.String()] = sc
s.sortSensorConfigPerContainer() //s.sortSensorConfigPerContainer()
//
if err := s.ContainerService.Send(occp.ToContainerType(sc.Crawler.Container), "SensorConfigService.AddConfig", sc); nil != err { //if err := s.ContainerService.Send(occp.ToContainerType(sc.Crawler.Container), "SensorConfigService.AddConfig", sc); nil != err {
return err // return err
} //}
return nil return nil
} }
@ -119,13 +126,13 @@ func (s *SensorConfigService) UpdateConfig(tempFilePath string) error {
return err return err
} }
delete(s.sensorConfigs, sc.ConfigID) //delete(s.sensorConfigs, sc.ConfigID)
s.sensorConfigs[sc.ID.String()] = sc //s.sensorConfigs[sc.ID.String()] = sc
s.sortSensorConfigPerContainer() //s.sortSensorConfigPerContainer()
//
if err := s.ContainerService.Send(occp.ToContainerType(sc.Crawler.Container), "SensorConfigService.UpdateConfig", sc); nil != err { //if err := s.ContainerService.Send(occp.ToContainerType(sc.Crawler.Container), "SensorConfigService.UpdateConfig", sc); nil != err {
return err // return err
} //}
return nil return nil
} }
@ -145,9 +152,9 @@ func (s *SensorConfigService) RemoveConfig(sensorConfigID string) error {
delete(s.sensorConfigs, sensorConfigID) delete(s.sensorConfigs, sensorConfigID)
s.sortSensorConfigPerContainer() s.sortSensorConfigPerContainer()
if err := s.ContainerService.Send(occp.ToContainerType(sc.Crawler.Container), "SensorConfigService.RemoveConfig", sensorConfigID); nil != err { //if err := s.ContainerService.Send(occp.ToContainerType(sc.Crawler.Container), "SensorConfigService.RemoveConfig", sensorConfigID); nil != err {
return err // return err
} //}
return nil return nil
} }
@ -198,10 +205,14 @@ func (s *SensorConfigService) loadConfigDir(dirPath string) error {
func (s *SensorConfigService) loadConfigFile(filePath string) (*ocmsc.SensorConfig, []byte, error) { func (s *SensorConfigService) loadConfigFile(filePath string) (*ocmsc.SensorConfig, []byte, error) {
logging.Logger().Debugf("filePath: %s", filePath) logging.Logger().Debugf("filePath: %s", filePath)
buf, err := ioutil.ReadFile(filePath) buf, err := ioutil.ReadFile(filePath)
if nil != err { if nil != err {
return nil, nil, err return nil, nil, err
} }
// probe ecncreiption key decode
// gzip decode
var m = &ocmsc.SensorConfig{} var m = &ocmsc.SensorConfig{}
if err := json.Unmarshal(buf, m); nil != err { if err := json.Unmarshal(buf, m); nil != err {
return nil, nil, err return nil, nil, err
@ -210,6 +221,40 @@ func (s *SensorConfigService) loadConfigFile(filePath string) (*ocmsc.SensorConf
return m, buf, nil return m, buf, nil
} }
func (s *SensorConfigService) loadConfigFile1(decodeStr string) (*ocmsc.SensorConfig, []byte, error) {
//keyStr := s.ProbeClientService.EncryptionKey
keyStr := "$2a$10$rPZYValfYvsVb8oDDz7gkuuf1FpqVnG8xbxQa.NhKdH7WDi.H2UD."
key := []byte(keyStr)
iv := []byte(keyStr)
block, err := des.NewCipher(key)
if nil != err {
return nil, nil, err
}
blockMode := cipher.NewCBCDecrypter(block, iv)
origData := make([]byte, len(decodeStr))
blockMode.CryptBlocks(origData, []byte(decodeStr))
origData = s.PKCS5UnPadding(origData)
gr, err := gzip.NewReader(bytes.NewBuffer(origData))
defer gr.Close()
data, err := ioutil.ReadAll(gr)
var m = &ocmsc.SensorConfig{}
if err := json.Unmarshal(data, m); nil != err {
return nil, nil, err
}
fmt.Println(string(data))
return m, data, nil
}
func (s *SensorConfigService) PKCS5UnPadding(src []byte) []byte {
length := len(src)
unpadding := int(src[length-1])
return src[:(length - unpadding)]
}
func (s *SensorConfigService) sortSensorConfigPerContainer() { func (s *SensorConfigService) sortSensorConfigPerContainer() {
if nil == s.sensorConfigs || 0 == len(s.sensorConfigs) { if nil == s.sensorConfigs || 0 == len(s.sensorConfigs) {
return return
@ -218,8 +263,8 @@ func (s *SensorConfigService) sortSensorConfigPerContainer() {
s.sensorConfigsPerContainer = nil s.sensorConfigsPerContainer = nil
s.sensorConfigsPerContainer = make(map[occp.ContainerType][]*ocmsc.SensorConfig) s.sensorConfigsPerContainer = make(map[occp.ContainerType][]*ocmsc.SensorConfig)
for _, sensorConfig := range s.sensorConfigs { //for _, sensorConfig := range s.sensorConfigs {
containerType := occp.ToContainerType(sensorConfig.Crawler.Container) // containerType := occp.ToContainerType(sensorConfig.Crawler.Container)
s.sensorConfigsPerContainer[containerType] = append(s.sensorConfigsPerContainer[containerType], sensorConfig) // s.sensorConfigsPerContainer[containerType] = append(s.sensorConfigsPerContainer[containerType], sensorConfig)
} //}
} }

View File

@ -1,6 +1,7 @@
package service package service
import ( import (
b64 "encoding/base64"
"reflect" "reflect"
cda "git.loafle.net/commons/di-go/annotation" cda "git.loafle.net/commons/di-go/annotation"
@ -8,6 +9,7 @@ import (
// For annotation // For annotation
_ "git.loafle.net/overflow/commons-go/core/annotation" _ "git.loafle.net/overflow/commons-go/core/annotation"
) )
var SensorServiceType = reflect.TypeOf((*SensorService)(nil)) var SensorServiceType = reflect.TypeOf((*SensorService)(nil))
@ -20,6 +22,7 @@ type SensorService struct {
cda.TypeAnnotation `annotation:"@overflow:RPCService()"` cda.TypeAnnotation `annotation:"@overflow:RPCService()"`
ContainerService *ContainerService `annotation:"@Inject()"` ContainerService *ContainerService `annotation:"@Inject()"`
SensorConfigService *SensorConfigService `annotation:"@Inject()"`
} }
func (s *SensorService) InitService() error { func (s *SensorService) InitService() error {
@ -50,6 +53,16 @@ func (s *SensorService) StopSensor(id int64) error {
func (s *SensorService) AddSensor(sensorConfigBase64 string) error { func (s *SensorService) AddSensor(sensorConfigBase64 string) error {
// base64 decode
bufByte, err := b64.StdEncoding.DecodeString(sensorConfigBase64)
if nil == bufByte || nil != err {
return err
}
s.SensorConfigService.AddConfig(string(bufByte[:]))
return nil return nil
} }

View File

@ -0,0 +1,17 @@
package service
import (
"testing"
"fmt"
)
func TestSensorService_AddSensor(t *testing.T) {
config := "o8rZu/ncU9dJ+0ryZ9fhy/WfBuSPHBlcdXMMvF8zEcAvRLlS/7b2iUduR9Z+8cGNPR8+fCvPNwgQb2JE2iHeQU6Dno3niWsp7sKC8KYpSPdB3B7QsqQ7Z4Svd4spmDgdxvggwceql1oFAfahQwGGnMxw8bt/phogtiDwo/jhDGGjfXvp9H4nlTjRIkHOC8w3pWWtjbvyb5ELn53YnaiQmzkFSUyz1Cs0IQzDorgvogz2rwmYmOw5kCepFDdnPVgtAsoxrhKrzg92SuyKcrZWGoPjgsl2PJ1ywojwEBgQ96cHIa0mxdC43qTrYbZ5o9+LXdIW9mewvfse8O+/yx8XzOHkZlONC9RtAIcYgoHiJS40ITp0XO7seg=="
s := &SensorService{}
err := s.AddSensor(config)
if nil != err {
fmt.Errorf(err.Error())
}
}