ing
This commit is contained in:
parent
ca13dea9cd
commit
a3ad29ff73
340
commons/scheduler/Job.go
Normal file
340
commons/scheduler/Job.go
Normal file
|
@ -0,0 +1,340 @@
|
|||
package scheduler
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"reflect"
|
||||
"time"
|
||||
)
|
||||
|
||||
const MAXJOBNUM = 10000
|
||||
|
||||
var _location = time.Local
|
||||
|
||||
type Job interface {
|
||||
Do(jobFun interface{}, params ...interface{})
|
||||
At(t string) Job
|
||||
NextScheduledTime() time.Time
|
||||
Second() (job Job)
|
||||
Seconds() (job Job)
|
||||
Minute() (job Job)
|
||||
Minutes() (job Job)
|
||||
Hour() (job Job)
|
||||
Hours() (job Job)
|
||||
Day() (job Job)
|
||||
Days() (job Job)
|
||||
Monday() (job Job)
|
||||
Tuesday() (job Job)
|
||||
Wednesday() (job Job)
|
||||
Thursday() (job Job)
|
||||
Friday() (job Job)
|
||||
Saturday() (job Job)
|
||||
Sunday() (job Job)
|
||||
Weeks() (job Job)
|
||||
|
||||
shouldRun() bool
|
||||
run() (result []reflect.Value, err error)
|
||||
scheduleNextRun()
|
||||
nextRun() time.Time
|
||||
jobFunc() string
|
||||
}
|
||||
|
||||
func NewJob(intervel uint64) Job {
|
||||
return &innerJob{
|
||||
intervel,
|
||||
"", "", "",
|
||||
time.Unix(0, 0),
|
||||
time.Unix(0, 0), 0,
|
||||
time.Sunday,
|
||||
make(map[string]interface{}),
|
||||
make(map[string]([]interface{})),
|
||||
}
|
||||
}
|
||||
|
||||
type innerJob struct {
|
||||
// pause interval * unit bettween runs
|
||||
interval uint64
|
||||
|
||||
// the job _jobFunc to run, func[_jobFunc]
|
||||
_jobFunc string
|
||||
// time units, ,e.g. 'minutes', 'hours'...
|
||||
unit string
|
||||
// optional time at which this job runs
|
||||
atTime string
|
||||
|
||||
// datetime of last run
|
||||
lastRun time.Time
|
||||
// datetime of next run
|
||||
_nextRun time.Time
|
||||
// cache the period between last an next run
|
||||
period time.Duration
|
||||
|
||||
// Specific day of the week to start on
|
||||
startDay time.Weekday
|
||||
|
||||
// Map for the function task store
|
||||
funcs map[string]interface{}
|
||||
|
||||
// Map for function and params of function
|
||||
fparams map[string]([]interface{})
|
||||
}
|
||||
|
||||
// True if the job should be run now
|
||||
func (j *innerJob) shouldRun() bool {
|
||||
return time.Now().After(j._nextRun)
|
||||
}
|
||||
|
||||
// True if the job should be run now
|
||||
func (j *innerJob) nextRun() time.Time {
|
||||
return j._nextRun
|
||||
}
|
||||
|
||||
// True if the job should be run now
|
||||
func (j *innerJob) jobFunc() string {
|
||||
return j._jobFunc
|
||||
}
|
||||
|
||||
//Run the job and immediately reschedule it
|
||||
func (j *innerJob) run() (result []reflect.Value, err error) {
|
||||
f := reflect.ValueOf(j.funcs[j._jobFunc])
|
||||
params := j.fparams[j._jobFunc]
|
||||
if len(params) != f.Type().NumIn() {
|
||||
err = errors.New("the number of param is not adapted")
|
||||
return
|
||||
}
|
||||
in := make([]reflect.Value, len(params))
|
||||
for k, param := range params {
|
||||
in[k] = reflect.ValueOf(param)
|
||||
}
|
||||
result = f.Call(in)
|
||||
j.lastRun = time.Now()
|
||||
j.scheduleNextRun()
|
||||
return
|
||||
}
|
||||
|
||||
func (j *innerJob) Do(jobFun interface{}, params ...interface{}) {
|
||||
typ := reflect.TypeOf(jobFun)
|
||||
if typ.Kind() != reflect.Func {
|
||||
panic("only function can be schedule into the job queue.")
|
||||
}
|
||||
|
||||
fname := getFunctionName(jobFun)
|
||||
j.funcs[fname] = jobFun
|
||||
j.fparams[fname] = params
|
||||
j._jobFunc = fname
|
||||
//schedule the next run
|
||||
j.scheduleNextRun()
|
||||
}
|
||||
|
||||
// s.Every(1).Day().At("10:30").Do(task)
|
||||
// s.Every(1).Monday().At("10:30").Do(task)
|
||||
func (j *innerJob) At(t string) Job {
|
||||
hour, min, err := formatTime(t)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
|
||||
mock := time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day(), int(hour), int(min), 0, 0, _location)
|
||||
|
||||
if j.unit == "days" {
|
||||
if time.Now().After(mock) {
|
||||
j.lastRun = mock
|
||||
} else {
|
||||
j.lastRun = time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day()-1, hour, min, 0, 0, _location)
|
||||
}
|
||||
} else if j.unit == "weeks" {
|
||||
if j.startDay != time.Now().Weekday() || (time.Now().After(mock) && j.startDay == time.Now().Weekday()) {
|
||||
i := mock.Weekday() - j.startDay
|
||||
if i < 0 {
|
||||
i = 7 + i
|
||||
}
|
||||
j.lastRun = time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day()-int(i), hour, min, 0, 0, _location)
|
||||
} else {
|
||||
j.lastRun = time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day()-7, hour, min, 0, 0, _location)
|
||||
}
|
||||
}
|
||||
return j
|
||||
}
|
||||
|
||||
//Compute the instant when this job should run next
|
||||
func (j *innerJob) scheduleNextRun() {
|
||||
if j.lastRun == time.Unix(0, 0) {
|
||||
if j.unit == "weeks" {
|
||||
i := time.Now().Weekday() - j.startDay
|
||||
if i < 0 {
|
||||
i = 7 + i
|
||||
}
|
||||
j.lastRun = time.Date(time.Now().Year(), time.Now().Month(), time.Now().Day()-int(i), 0, 0, 0, 0, _location)
|
||||
|
||||
} else {
|
||||
j.lastRun = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
if j.period != 0 {
|
||||
// translate all the units to the Seconds
|
||||
j._nextRun = j.lastRun.Add(j.period * time.Second)
|
||||
} else {
|
||||
switch j.unit {
|
||||
case "minutes":
|
||||
j.period = time.Duration(j.interval * 60)
|
||||
break
|
||||
case "hours":
|
||||
j.period = time.Duration(j.interval * 60 * 60)
|
||||
break
|
||||
case "days":
|
||||
j.period = time.Duration(j.interval * 60 * 60 * 24)
|
||||
break
|
||||
case "weeks":
|
||||
j.period = time.Duration(j.interval * 60 * 60 * 24 * 7)
|
||||
break
|
||||
case "seconds":
|
||||
j.period = time.Duration(j.interval)
|
||||
}
|
||||
j._nextRun = j.lastRun.Add(j.period * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// NextScheduledTime returns the time of when this job is to run next
|
||||
func (j *innerJob) NextScheduledTime() time.Time {
|
||||
return j._nextRun
|
||||
}
|
||||
|
||||
// the follow functions set the job's unit with seconds,minutes,hours...
|
||||
|
||||
// Set the unit with second
|
||||
func (j *innerJob) Second() (job Job) {
|
||||
if j.interval != 1 {
|
||||
panic("")
|
||||
}
|
||||
job = j.Seconds()
|
||||
return
|
||||
}
|
||||
|
||||
// Set the unit with seconds
|
||||
func (j *innerJob) Seconds() (job Job) {
|
||||
j.unit = "seconds"
|
||||
return j
|
||||
}
|
||||
|
||||
// Set the unit with minute, which interval is 1
|
||||
func (j *innerJob) Minute() (job Job) {
|
||||
if j.interval != 1 {
|
||||
panic("")
|
||||
}
|
||||
job = j.Minutes()
|
||||
return
|
||||
}
|
||||
|
||||
//set the unit with minute
|
||||
func (j *innerJob) Minutes() (job Job) {
|
||||
j.unit = "minutes"
|
||||
return j
|
||||
}
|
||||
|
||||
//set the unit with hour, which interval is 1
|
||||
func (j *innerJob) Hour() (job Job) {
|
||||
if j.interval != 1 {
|
||||
panic("")
|
||||
}
|
||||
job = j.Hours()
|
||||
return
|
||||
}
|
||||
|
||||
// Set the unit with hours
|
||||
func (j *innerJob) Hours() (job Job) {
|
||||
j.unit = "hours"
|
||||
return j
|
||||
}
|
||||
|
||||
// Set the job's unit with day, which interval is 1
|
||||
func (j *innerJob) Day() (job Job) {
|
||||
if j.interval != 1 {
|
||||
panic("")
|
||||
}
|
||||
job = j.Days()
|
||||
return
|
||||
}
|
||||
|
||||
// Set the job's unit with days
|
||||
func (j *innerJob) Days() Job {
|
||||
j.unit = "days"
|
||||
return j
|
||||
}
|
||||
|
||||
// s.Every(1).Monday().Do(task)
|
||||
// Set the start day with Monday
|
||||
func (j *innerJob) Monday() (job Job) {
|
||||
if j.interval != 1 {
|
||||
panic("")
|
||||
}
|
||||
j.startDay = 1
|
||||
job = j.Weeks()
|
||||
return
|
||||
}
|
||||
|
||||
// Set the start day with Tuesday
|
||||
func (j *innerJob) Tuesday() (job Job) {
|
||||
if j.interval != 1 {
|
||||
panic("")
|
||||
}
|
||||
j.startDay = 2
|
||||
job = j.Weeks()
|
||||
return
|
||||
}
|
||||
|
||||
// Set the start day woth Wednesday
|
||||
func (j *innerJob) Wednesday() (job Job) {
|
||||
if j.interval != 1 {
|
||||
panic("")
|
||||
}
|
||||
j.startDay = 3
|
||||
job = j.Weeks()
|
||||
return
|
||||
}
|
||||
|
||||
// Set the start day with thursday
|
||||
func (j *innerJob) Thursday() (job Job) {
|
||||
if j.interval != 1 {
|
||||
panic("")
|
||||
}
|
||||
j.startDay = 4
|
||||
job = j.Weeks()
|
||||
return
|
||||
}
|
||||
|
||||
// Set the start day with friday
|
||||
func (j *innerJob) Friday() (job Job) {
|
||||
if j.interval != 1 {
|
||||
panic("")
|
||||
}
|
||||
j.startDay = 5
|
||||
job = j.Weeks()
|
||||
return
|
||||
}
|
||||
|
||||
// Set the start day with saturday
|
||||
func (j *innerJob) Saturday() (job Job) {
|
||||
if j.interval != 1 {
|
||||
panic("")
|
||||
}
|
||||
j.startDay = 6
|
||||
job = j.Weeks()
|
||||
return
|
||||
}
|
||||
|
||||
// Set the start day with sunday
|
||||
func (j *innerJob) Sunday() (job Job) {
|
||||
if j.interval != 1 {
|
||||
panic("")
|
||||
}
|
||||
j.startDay = 0
|
||||
job = j.Weeks()
|
||||
return
|
||||
}
|
||||
|
||||
//Set the units as weeks
|
||||
func (j *innerJob) Weeks() Job {
|
||||
j.unit = "weeks"
|
||||
return j
|
||||
}
|
152
commons/scheduler/Scheduler.go
Normal file
152
commons/scheduler/Scheduler.go
Normal file
|
@ -0,0 +1,152 @@
|
|||
package scheduler
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Scheduler interface {
|
||||
Len() int
|
||||
Swap(i, j int)
|
||||
Less(i, j int) bool
|
||||
NextRun() (Job, time.Time)
|
||||
Every(interval uint64) Job
|
||||
RunPending()
|
||||
RunAll()
|
||||
RunAllwithDelay(d int)
|
||||
Remove(j interface{})
|
||||
Clear()
|
||||
Start() chan bool
|
||||
|
||||
getRunnableJobs() (running_jobs [MAXJOBNUM]Job, n int)
|
||||
}
|
||||
|
||||
// Create a new scheduler
|
||||
func NewScheduler() Scheduler {
|
||||
return &innerScheduler{[MAXJOBNUM]Job{}, 0}
|
||||
}
|
||||
|
||||
type innerScheduler struct {
|
||||
// Array store jobs
|
||||
jobs [MAXJOBNUM]Job
|
||||
|
||||
// Size of jobs which jobs holding.
|
||||
size int
|
||||
}
|
||||
|
||||
func (s *innerScheduler) Len() int {
|
||||
return s.size
|
||||
}
|
||||
|
||||
func (s *innerScheduler) Swap(i, j int) {
|
||||
s.jobs[i], s.jobs[j] = s.jobs[j], s.jobs[i]
|
||||
}
|
||||
|
||||
func (s *innerScheduler) Less(i, j int) bool {
|
||||
return s.jobs[j].nextRun().After(s.jobs[i].nextRun())
|
||||
}
|
||||
|
||||
// Get the current runnable jobs, which shouldRun is True
|
||||
func (s *innerScheduler) getRunnableJobs() (running_jobs [MAXJOBNUM]Job, n int) {
|
||||
runnableJobs := [MAXJOBNUM]Job{}
|
||||
n = 0
|
||||
sort.Sort(s)
|
||||
for i := 0; i < s.size; i++ {
|
||||
if s.jobs[i].shouldRun() {
|
||||
|
||||
runnableJobs[n] = s.jobs[i]
|
||||
//fmt.Println(runnableJobs)
|
||||
n++
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
return runnableJobs, n
|
||||
}
|
||||
|
||||
// Datetime when the next job should run.
|
||||
func (s *innerScheduler) NextRun() (Job, time.Time) {
|
||||
if s.size <= 0 {
|
||||
return nil, time.Now()
|
||||
}
|
||||
sort.Sort(s)
|
||||
return s.jobs[0], s.jobs[0].nextRun()
|
||||
}
|
||||
|
||||
// Schedule a new periodic job
|
||||
func (s *innerScheduler) Every(interval uint64) Job {
|
||||
job := NewJob(interval)
|
||||
s.jobs[s.size] = job
|
||||
s.size++
|
||||
return job
|
||||
}
|
||||
|
||||
// Run all the jobs that are scheduled to run.
|
||||
func (s *innerScheduler) RunPending() {
|
||||
runnableJobs, n := s.getRunnableJobs()
|
||||
|
||||
if n != 0 {
|
||||
for i := 0; i < n; i++ {
|
||||
runnableJobs[i].run()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run all jobs regardless if they are scheduled to run or not
|
||||
func (s *innerScheduler) RunAll() {
|
||||
for i := 0; i < s.size; i++ {
|
||||
s.jobs[i].run()
|
||||
}
|
||||
}
|
||||
|
||||
// Run all jobs with delay seconds
|
||||
func (s *innerScheduler) RunAllwithDelay(d int) {
|
||||
for i := 0; i < s.size; i++ {
|
||||
s.jobs[i].run()
|
||||
time.Sleep(time.Duration(d))
|
||||
}
|
||||
}
|
||||
|
||||
// Remove specific job j
|
||||
func (s *innerScheduler) Remove(j interface{}) {
|
||||
i := 0
|
||||
for ; i < s.size; i++ {
|
||||
if s.jobs[i].jobFunc() == getFunctionName(j) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for j := (i + 1); j < s.size; j++ {
|
||||
s.jobs[i] = s.jobs[j]
|
||||
i++
|
||||
}
|
||||
s.size = s.size - 1
|
||||
}
|
||||
|
||||
// Delete all scheduled jobs
|
||||
func (s *innerScheduler) Clear() {
|
||||
for i := 0; i < s.size; i++ {
|
||||
s.jobs[i] = nil
|
||||
}
|
||||
s.size = 0
|
||||
}
|
||||
|
||||
// Start all the pending jobs
|
||||
// Add seconds ticker
|
||||
func (s *innerScheduler) Start() chan bool {
|
||||
stopped := make(chan bool, 1)
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
s.RunPending()
|
||||
case <-stopped:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return stopped
|
||||
}
|
55
commons/scheduler/default.go
Normal file
55
commons/scheduler/default.go
Normal file
|
@ -0,0 +1,55 @@
|
|||
package scheduler
|
||||
|
||||
import "time"
|
||||
|
||||
var defaultScheduler = NewScheduler()
|
||||
|
||||
// Schedule a new periodic job
|
||||
func Every(interval uint64) Job {
|
||||
return defaultScheduler.Every(interval)
|
||||
}
|
||||
|
||||
// Run all jobs that are scheduled to run
|
||||
//
|
||||
// Please note that it is *intended behavior that run_pending()
|
||||
// does not run missed jobs*. For example, if you've registered a job
|
||||
// that should run every minute and you only call run_pending()
|
||||
// in one hour increments then your job won't be run 60 times in
|
||||
// between but only once.
|
||||
func RunPending() {
|
||||
defaultScheduler.RunPending()
|
||||
}
|
||||
|
||||
// Run all jobs regardless if they are scheduled to run or not.
|
||||
func RunAll() {
|
||||
defaultScheduler.RunAll()
|
||||
}
|
||||
|
||||
// Run all the jobs with a delay in seconds
|
||||
//
|
||||
// A delay of `delay` seconds is added between each job. This can help
|
||||
// to distribute the system load generated by the jobs more evenly over
|
||||
// time.
|
||||
func RunAllwithDelay(d int) {
|
||||
defaultScheduler.RunAllwithDelay(d)
|
||||
}
|
||||
|
||||
// Run all jobs that are scheduled to run
|
||||
func Start() chan bool {
|
||||
return defaultScheduler.Start()
|
||||
}
|
||||
|
||||
// Clear
|
||||
func Clear() {
|
||||
defaultScheduler.Clear()
|
||||
}
|
||||
|
||||
// Remove
|
||||
func Remove(j interface{}) {
|
||||
defaultScheduler.Remove(j)
|
||||
}
|
||||
|
||||
// NextRun gets the next running time
|
||||
func NextRun() (job Job, time time.Time) {
|
||||
return defaultScheduler.NextRun()
|
||||
}
|
38
commons/scheduler/util.go
Normal file
38
commons/scheduler/util.go
Normal file
|
@ -0,0 +1,38 @@
|
|||
package scheduler
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// for given function fn, get the name of function.
|
||||
func getFunctionName(fn interface{}) string {
|
||||
return runtime.FuncForPC(reflect.ValueOf((fn)).Pointer()).Name()
|
||||
}
|
||||
|
||||
func formatTime(t string) (hour, min int, err error) {
|
||||
var er = errors.New("time format error")
|
||||
ts := strings.Split(t, ":")
|
||||
if len(ts) != 2 {
|
||||
err = er
|
||||
return
|
||||
}
|
||||
|
||||
hour, err = strconv.Atoi(ts[0])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
min, err = strconv.Atoi(ts[1])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if hour < 0 || hour > 23 || min < 0 || min > 59 {
|
||||
err = er
|
||||
return
|
||||
}
|
||||
return hour, min, nil
|
||||
}
|
|
@ -26,6 +26,10 @@ func ConfigDirPath() string {
|
|||
return path.Join(RootDirPath(), ooccp.PathConfig)
|
||||
}
|
||||
|
||||
func SensorConfigDirPath() string {
|
||||
return path.Join(RootDirPath(), ooccp.PathSensorConfig)
|
||||
}
|
||||
|
||||
func JREDirPath() string {
|
||||
return path.Join(RootDirPath(), ooccp.PathJRE)
|
||||
}
|
||||
|
|
15
config/sensor_config.go
Normal file
15
config/sensor_config.go
Normal file
|
@ -0,0 +1,15 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"path"
|
||||
|
||||
sensorConfigM "git.loafle.net/overflow/overflow_commons_go/modules/sensor_config/model"
|
||||
)
|
||||
|
||||
func SensorConfigContainerDirPath(name string) string {
|
||||
return path.Join(SensorConfigDirPath(), name)
|
||||
}
|
||||
|
||||
func SensorConfigFilePath(sensorConfig *sensorConfigM.SensorConfig) string {
|
||||
return path.Join(SensorConfigContainerDirPath(sensorConfig.Crawler.Container), sensorConfig.Crawler.Name, sensorConfig.ID.String())
|
||||
}
|
|
@ -1,3 +1,3 @@
|
|||
{
|
||||
"tempKey": "84cdd159-2839-439c-97e1-72dc9efadc35"
|
||||
"tempKey": "30e31f01-dc04-47ee-baa0-689ea32158eb"
|
||||
}
|
|
@ -2,6 +2,7 @@ package probe
|
|||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
cdr "git.loafle.net/commons_go/di/registry"
|
||||
|
@ -9,11 +10,21 @@ import (
|
|||
crr "git.loafle.net/commons_go/rpc/registry"
|
||||
oocmci "git.loafle.net/overflow/overflow_commons_go/modules/commons/interfaces"
|
||||
oocmp "git.loafle.net/overflow/overflow_commons_go/modules/probe"
|
||||
oocu "git.loafle.net/overflow/overflow_commons_go/util"
|
||||
oogwc "git.loafle.net/overflow/overflow_gateway_websocket/client"
|
||||
oopccd "git.loafle.net/overflow/overflow_probes/client/central/data"
|
||||
oopccp "git.loafle.net/overflow/overflow_probes/client/central/probe"
|
||||
oopca "git.loafle.net/overflow/overflow_probes/commons/annotation"
|
||||
oops "git.loafle.net/overflow/overflow_probes/service"
|
||||
"git.loafle.net/overflow/overflow_probes/service"
|
||||
)
|
||||
|
||||
var (
|
||||
servicesToStartAndStop = []reflect.Type{
|
||||
reflect.TypeOf((*service.CentralService)(nil)),
|
||||
reflect.TypeOf((*service.SensorConfigService)(nil)),
|
||||
reflect.TypeOf((*service.CrawlerService)(nil)),
|
||||
reflect.TypeOf((*service.CollectorService)(nil)),
|
||||
}
|
||||
)
|
||||
|
||||
func New() ProbeManager {
|
||||
|
@ -37,7 +48,7 @@ func (pm *probeManagers) Start() error {
|
|||
logging.Logger().Panicf("Probe: already running. Stop it before starting it again")
|
||||
}
|
||||
|
||||
oops.InitService()
|
||||
service.InitService()
|
||||
|
||||
probeRPCRegistry := crr.NewRPCRegistry()
|
||||
|
||||
|
@ -64,14 +75,6 @@ func (pm *probeManagers) Start() error {
|
|||
probeRPCRegistry.RegisterService(s, "")
|
||||
}
|
||||
|
||||
if err := centralProbeClient.Connect(); nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := centralDataClient.Connect(); nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
pm.stopChan = make(chan struct{})
|
||||
|
||||
pm.stopWg.Add(1)
|
||||
|
@ -96,21 +99,26 @@ func (pm *probeManagers) Stop(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (pm *probeManagers) handleProbe(services []interface{}) {
|
||||
// var err error
|
||||
defer func() {
|
||||
pm.stopWg.Done()
|
||||
var (
|
||||
err error
|
||||
)
|
||||
|
||||
oops.StopService(services)
|
||||
err = oocu.ExecuteStarters(services, servicesToStartAndStop, false)
|
||||
if nil != err {
|
||||
logging.Logger().Panic(err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err = oocu.ExecuteStoppers(services, servicesToStartAndStop, true)
|
||||
if nil != err {
|
||||
logging.Logger().Error(err)
|
||||
}
|
||||
|
||||
pm.stopWg.Done()
|
||||
|
||||
pm.Stop(nil)
|
||||
}()
|
||||
|
||||
oops.StartService(services)
|
||||
|
||||
// if err = pm.cClient.Connect(); nil != err {
|
||||
// return
|
||||
// }
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-pm.stopChan:
|
||||
|
|
|
@ -2,6 +2,7 @@ package service
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
cda "git.loafle.net/commons_go/di/annotation"
|
||||
|
@ -23,6 +24,15 @@ type CentralService struct {
|
|||
}
|
||||
|
||||
func (cs *CentralService) Start() error {
|
||||
if nil == cs.CentralClients || 0 == len(cs.CentralClients) {
|
||||
return fmt.Errorf("There is no clients of Central")
|
||||
}
|
||||
|
||||
for k, client := range cs.CentralClients {
|
||||
if err := client.Connect(); nil != err {
|
||||
return fmt.Errorf("Cannot connect to client[%s] of Central %v", k, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
43
service/CollectorService.go
Normal file
43
service/CollectorService.go
Normal file
|
@ -0,0 +1,43 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
|
||||
cda "git.loafle.net/commons_go/di/annotation"
|
||||
cdr "git.loafle.net/commons_go/di/registry"
|
||||
oocmci "git.loafle.net/overflow/overflow_commons_go/modules/commons/interfaces"
|
||||
"git.loafle.net/overflow/overflow_probes/commons/scheduler"
|
||||
)
|
||||
|
||||
func init() {
|
||||
cdr.RegisterType(reflect.TypeOf((*CollectorService)(nil)))
|
||||
}
|
||||
|
||||
type CollectorService struct {
|
||||
cda.TypeAnnotation `annotation:"@overFlow:Service()"`
|
||||
oocmci.Service
|
||||
|
||||
SensorConfigService *SensorConfigService `annotation:"@Inject()"`
|
||||
ContainerService *ContainerService `annotation:"@Inject()"`
|
||||
CentralService *CentralService `annotation:"@Inject()"`
|
||||
|
||||
crawlingScheduler scheduler.Scheduler
|
||||
}
|
||||
|
||||
func (cs *CollectorService) Start() error {
|
||||
// cs.crawlingScheduler = scheduler.NewScheduler()
|
||||
|
||||
// cs.crawlingScheduler.Every(1).Second().Do(cs.test)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *CollectorService) Stop(ctx context.Context) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *CollectorService) test() {
|
||||
|
||||
}
|
|
@ -1,76 +0,0 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"path"
|
||||
"reflect"
|
||||
|
||||
cda "git.loafle.net/commons_go/di/annotation"
|
||||
cdr "git.loafle.net/commons_go/di/registry"
|
||||
oocmci "git.loafle.net/overflow/overflow_commons_go/modules/commons/interfaces"
|
||||
"git.loafle.net/overflow/overflow_probes/config"
|
||||
)
|
||||
|
||||
func init() {
|
||||
cdr.RegisterType(reflect.TypeOf((*ConfigService)(nil)))
|
||||
|
||||
}
|
||||
|
||||
type ConfigService struct {
|
||||
cda.TypeAnnotation `annotation:"@overFlow:Service()"`
|
||||
oocmci.Service
|
||||
}
|
||||
|
||||
func (cs *ConfigService) Start() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *ConfigService) Stop(ctx context.Context) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *ConfigService) loadConfigAll() error {
|
||||
configDirPath := config.ConfigDirPath()
|
||||
files, err := ioutil.ReadDir(configDirPath)
|
||||
if nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
if file.IsDir() == true {
|
||||
if err := cs.loadConfigDir(path.Join(configDirPath, file.Name())); nil != err {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *ConfigService) loadConfigDir(dirPath string) error {
|
||||
files, err := ioutil.ReadDir(dirPath)
|
||||
if nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
filePath := path.Join(dirPath, file.Name())
|
||||
|
||||
if file.IsDir() == true {
|
||||
cs.loadConfigDir(filePath)
|
||||
} else {
|
||||
_, err := ioutil.ReadFile(filePath)
|
||||
if nil != err {
|
||||
return err
|
||||
}
|
||||
// var m = config_manager.Config{}
|
||||
// json.Unmarshal(b, &m)
|
||||
// c.configs[file.Name()] = &m
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -6,8 +6,9 @@ import (
|
|||
|
||||
cda "git.loafle.net/commons_go/di/annotation"
|
||||
cdr "git.loafle.net/commons_go/di/registry"
|
||||
"git.loafle.net/commons_go/logging"
|
||||
oocmci "git.loafle.net/overflow/overflow_commons_go/modules/commons/interfaces"
|
||||
configM "git.loafle.net/overflow/overflow_commons_go/modules/config/model"
|
||||
sensorConfigM "git.loafle.net/overflow/overflow_commons_go/modules/sensor_config/model"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -17,9 +18,24 @@ func init() {
|
|||
type CrawlerService struct {
|
||||
cda.TypeAnnotation `annotation:"@overFlow:Service()"`
|
||||
oocmci.Service
|
||||
|
||||
SensorConfigService *SensorConfigService `annotation:"@Inject()"`
|
||||
ContainerService *ContainerService `annotation:"@Inject()"`
|
||||
}
|
||||
|
||||
func (cs *CrawlerService) Start() error {
|
||||
sensorConfigs := cs.SensorConfigService.sensorConfigs
|
||||
if nil == sensorConfigs || 0 == len(sensorConfigs) {
|
||||
logging.Logger().Infof("Probe: There is no Sensor config")
|
||||
return nil
|
||||
}
|
||||
|
||||
sortedMap := sortSensorConfigPerContainer(sensorConfigs)
|
||||
for containerName, configs := range sortedMap {
|
||||
if err := cs.ContainerService.Send(containerName, "SensorConfigService.Init", configs); nil != err {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -41,6 +57,26 @@ func (cs *CrawlerService) Update() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (cs *CrawlerService) Authenticate(crawler *configM.Crawler, target *configM.Target) error {
|
||||
func (cs *CrawlerService) Authenticate(crawler *sensorConfigM.Crawler, target *sensorConfigM.Target) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func sortSensorConfigPerContainer(sensorConfigMap map[string]*sensorConfigM.SensorConfig) map[string][]*sensorConfigM.SensorConfig {
|
||||
if nil == sensorConfigMap || 0 == len(sensorConfigMap) {
|
||||
return nil
|
||||
}
|
||||
|
||||
m := make(map[string][]*sensorConfigM.SensorConfig)
|
||||
|
||||
for _, sensorConfig := range sensorConfigMap {
|
||||
containerName := sensorConfig.Crawler.Container
|
||||
scs, ok := m[containerName]
|
||||
if !ok {
|
||||
scs = make([]*sensorConfigM.SensorConfig, 0)
|
||||
m[containerName] = scs
|
||||
}
|
||||
scs = append(scs, sensorConfig)
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
|
142
service/SensorConfigService.go
Normal file
142
service/SensorConfigService.go
Normal file
|
@ -0,0 +1,142 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
|
||||
cda "git.loafle.net/commons_go/di/annotation"
|
||||
cdr "git.loafle.net/commons_go/di/registry"
|
||||
oocmci "git.loafle.net/overflow/overflow_commons_go/modules/commons/interfaces"
|
||||
sensorConfigM "git.loafle.net/overflow/overflow_commons_go/modules/sensor_config/model"
|
||||
"git.loafle.net/overflow/overflow_probes/config"
|
||||
)
|
||||
|
||||
func init() {
|
||||
cdr.RegisterType(reflect.TypeOf((*SensorConfigService)(nil)))
|
||||
|
||||
}
|
||||
|
||||
type SensorConfigService struct {
|
||||
cda.TypeAnnotation `annotation:"@overFlow:Service()"`
|
||||
oocmci.Service
|
||||
|
||||
sensorConfigs map[string]*sensorConfigM.SensorConfig
|
||||
}
|
||||
|
||||
func (scs *SensorConfigService) Start() error {
|
||||
var (
|
||||
err error
|
||||
)
|
||||
|
||||
scs.sensorConfigs = make(map[string]*sensorConfigM.SensorConfig)
|
||||
|
||||
if err = scs.loadConfigAll(); nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scs *SensorConfigService) Stop(ctx context.Context) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scs *SensorConfigService) loadConfigAll() error {
|
||||
configDirPath := config.ConfigDirPath()
|
||||
files, err := ioutil.ReadDir(configDirPath)
|
||||
if nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
if file.IsDir() == true {
|
||||
if err := scs.loadConfigDir(path.Join(configDirPath, file.Name())); nil != err {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scs *SensorConfigService) loadConfigDir(dirPath string) error {
|
||||
files, err := ioutil.ReadDir(dirPath)
|
||||
if nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
filePath := path.Join(dirPath, file.Name())
|
||||
|
||||
if file.IsDir() == true {
|
||||
if err := scs.loadConfigDir(filePath); nil != err {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
sc, _, err := scs.loadConfigFile(filePath)
|
||||
if nil != err {
|
||||
return err
|
||||
}
|
||||
scs.sensorConfigs[file.Name()] = sc
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scs *SensorConfigService) loadConfigFile(filePath string) (*sensorConfigM.SensorConfig, []byte, error) {
|
||||
buf, err := ioutil.ReadFile(filePath)
|
||||
if nil != err {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
var m = &sensorConfigM.SensorConfig{}
|
||||
if err := json.Unmarshal(buf, m); nil != err {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return m, buf, nil
|
||||
}
|
||||
|
||||
func (scs *SensorConfigService) AddConfig(tempFilePath string) error {
|
||||
sc, buf, err := scs.loadConfigFile(tempFilePath)
|
||||
if nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
targetPath := config.SensorConfigFilePath(sc)
|
||||
ioutil.WriteFile(targetPath, buf, 0644)
|
||||
|
||||
// tempfile remove
|
||||
err = os.Remove(tempFilePath)
|
||||
if nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
scs.sensorConfigs[sc.ID.String()] = sc
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scs *SensorConfigService) RemoveConfig(sensorConfigID string) error {
|
||||
sc, ok := scs.sensorConfigs[sensorConfigID]
|
||||
if !ok {
|
||||
return fmt.Errorf("Probe: SensorConfig[%s] is not exist", sensorConfigID)
|
||||
}
|
||||
|
||||
targetPath := config.SensorConfigFilePath(sc)
|
||||
err := os.Remove(targetPath)
|
||||
if nil != err {
|
||||
return err
|
||||
}
|
||||
|
||||
delete(scs.sensorConfigs, sensorConfigID)
|
||||
|
||||
return nil
|
||||
}
|
|
@ -7,7 +7,6 @@ import (
|
|||
cda "git.loafle.net/commons_go/di/annotation"
|
||||
cdr "git.loafle.net/commons_go/di/registry"
|
||||
oocmci "git.loafle.net/overflow/overflow_commons_go/modules/commons/interfaces"
|
||||
configM "git.loafle.net/overflow/overflow_commons_go/modules/config/model"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -17,6 +16,8 @@ func init() {
|
|||
type SensorService struct {
|
||||
cda.TypeAnnotation `annotation:"@overFlow:Service()"`
|
||||
oocmci.Service
|
||||
|
||||
SensorConfigService *SensorConfigService `annotation:"@Inject()"`
|
||||
}
|
||||
|
||||
func (ss *SensorService) Start() error {
|
||||
|
@ -37,11 +38,13 @@ func (ss *SensorService) StopSensor(id int64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ss *SensorService) AddSensor(config *configM.Config) error {
|
||||
func (ss *SensorService) AddSensor(sensorConfigBase64 string) error {
|
||||
// ss.SensorConfigService.AddConfig()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ss *SensorService) RemoveSensor(id int64) error {
|
||||
func (ss *SensorService) RemoveSensor(sensorConfigID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,72 +1,8 @@
|
|||
package service
|
||||
|
||||
// "context"
|
||||
// "time"
|
||||
|
||||
// "git.loafle.net/commons_go/logging"
|
||||
// oocmci "git.loafle.net/overflow/overflow_commons_go/modules/commons/interfaces"
|
||||
|
||||
func InitService() {
|
||||
}
|
||||
|
||||
func StartService(services []interface{}) {
|
||||
// 1. Sensor config
|
||||
// 2. Container
|
||||
// 3. Collector
|
||||
|
||||
// if nil != services && 0 < len(services) {
|
||||
// for _, service := range services {
|
||||
// if err := service.(oocmci.Service).Start(); err != nil {
|
||||
// logging.Logger().Errorf("Probe: Cannot start service %v", err)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
func StopService(services []interface{}) {
|
||||
// if nil != services && 0 < len(services) {
|
||||
// for _, service := range services {
|
||||
// ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
// defer cancel()
|
||||
// if err := service.(oocmci.Service).Stop(ctx); err != nil {
|
||||
// logging.Logger().Errorf("Probe: Cannot start service %v", err)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
func DestroyService() {
|
||||
|
||||
}
|
||||
|
||||
// func GetService(name string) interface{} {
|
||||
// var t reflect.Type
|
||||
// switch name {
|
||||
// case "CentralService":
|
||||
// t = reflect.TypeOf((*CentralService)(nil))
|
||||
// case "ConfigService":
|
||||
// t = reflect.TypeOf((*ConfigService)(nil))
|
||||
// case "ContainerService":
|
||||
// t = reflect.TypeOf((*ContainerService)(nil))
|
||||
// case "CrawlerService":
|
||||
// t = reflect.TypeOf((*CrawlerService)(nil))
|
||||
// case "DiscoveryService":
|
||||
// t = reflect.TypeOf((*DiscoveryService)(nil))
|
||||
// case "LogService":
|
||||
// t = reflect.TypeOf((*LogService)(nil))
|
||||
// case "ProbeService":
|
||||
// t = reflect.TypeOf((*ProbeService)(nil))
|
||||
// case "SensorService":
|
||||
// t = reflect.TypeOf((*SensorService)(nil))
|
||||
// default:
|
||||
// logging.Logger().Panicf("Probe: Service[%s] is not exist", name)
|
||||
// return nil
|
||||
// }
|
||||
|
||||
// i, err := cdr.GetInstance(t)
|
||||
// if nil != err {
|
||||
// logging.Logger().Panicf("Probe: Getting Service[%s] is failed %v", name, err)
|
||||
// return nil
|
||||
// }
|
||||
// return i
|
||||
// }
|
||||
|
|
Loading…
Reference in New Issue
Block a user