collector

This commit is contained in:
insanity@loafle.com 2017-04-15 20:32:56 +09:00
parent e1a0fe961a
commit 142f267ab9
5 changed files with 156 additions and 106 deletions

View File

@ -1,71 +1,136 @@
package collector_go package collector_go
import ( import (
sm "loafle.com/overflow/collector_go/scheduler" "encoding/json"
"errors"
"fmt"
"io/ioutil"
s "loafle.com/overflow/collector_go/scheduler"
conf "loafle.com/overflow/crawler_go/config"
"log" "log"
"os"
"path/filepath"
"strings"
"time"
) )
type scheduleInfo struct { const CONFIG_ROOT = "/config/container/"
sensorId string
interval string
}
type schedules []*scheduleInfo
type Collector struct { type Collector struct {
scheduler sm.Scheduler scheduler s.Scheduler
configs []*conf.Config
} }
func (c *Collector) Start() { func (c *Collector) Start() {
result := c.genSchedules() go func() {
if len(result) <= 0 { c.configs = make([]*conf.Config, 0)
if err := c.readAllConfig(CONFIG_ROOT); err != nil {
log.Println(err)
}
c.scheduler = s.Scheduler{}
c.scheduler.Init()
for _, conf := range c.configs {
if err := c.addSensor(conf); err != nil {
log.Println(err)
}
}
}()
}
func (c *Collector) AddSensor(container, crawler, id string) {
config := c.readConfig(CONFIG_ROOT + container + "/" + crawler + "/" + id + ".conf")
if config != nil {
if err := c.addSensor(config); err != nil {
log.Println(err)
}
}
}
func (c *Collector) RemoveSensor(id string) {
if err := c.scheduler.RemoveSchedule(id); err != nil {
log.Println(err)
}
}
func (c *Collector) UpdateSensor(id, interval string) {
if !c.checkExist(id) {
log.Println("Cannot update Sensor : ID not exist [" + id + "]")
return return
} }
c.scheduler = sm.Scheduler{} c.scheduler.UpdateSchedule(id, interval)
c.scheduler.Init()
for i := 0; i < len(result); i++ {
r := result[i]
c.scheduler.NewSchedule(r.sensorId, r.interval, c.collect)
}
} }
func (c *Collector) Stop() { func (c *Collector) Stop() {
c.scheduler.RemoveAllSchedule() c.scheduler.RemoveAllSchedule()
} c.scheduler.Stop()
func (c *Collector) AddSensor(container, crawler, id string) {
s := c.genSchedule(container, crawler, id)
c.scheduler.NewSchedule(s.sensorId, s.interval, c.collect)
}
func (c *Collector) RemoveSensor() {
} }
func (c *Collector) collect(id string) { func (c *Collector) collect(id string) {
log.Println("collect ", id) log.Printf("COLLECT %s - [ID: %s]", time.Now(), id)
} }
func (c *Collector) genSchedules() schedules { func (c *Collector) addSensor(conf *conf.Config) error {
ss := make([]*scheduleInfo, 0) return c.scheduler.NewSchedule(conf.Id, conf.Schedule.Interval, c.collect)
s1 := &scheduleInfo{
sensorId: "aa",
interval: "3",
}
s2 := &scheduleInfo{
sensorId: "bb",
interval: "5",
}
ss = append(ss, s1)
ss = append(ss, s2)
return ss
} }
func (c *Collector) genSchedule(container, crawler, id string) *scheduleInfo { func (c *Collector) readAllConfig(rootPath string) error {
s := &scheduleInfo{
sensorId: "cc", err := filepath.Walk(rootPath, func(path string, f os.FileInfo, err error) error {
interval: "5", if err != nil {
return err
}
if !f.IsDir() && strings.HasSuffix(f.Name(), ".conf") {
c.readConfig(path)
}
return nil
})
if err != nil {
return err
} }
return s
if len(c.configs) <= 0 {
return errors.New("No configuration file found.")
}
return nil
}
func (c *Collector) readConfig(path string) *conf.Config {
bytes, err := ioutil.ReadFile(path)
if err != nil {
fmt.Println(err)
}
conf := conf.Config{}
json.Unmarshal(bytes, &conf)
if err := c.validateConfig(&conf, path); err != nil {
fmt.Println(err)
return nil
}
c.configs = append(c.configs, &conf)
return &conf
}
func (c *Collector) validateConfig(conf *conf.Config, configPath string) error {
//todo : some validations
if c.checkExist(conf.Id) {
return errors.New("The Same Id already exists. " + configPath)
}
return nil
}
func (c *Collector) checkExist(id string) bool {
for _, c := range c.configs {
if c.Id == id {
return true
}
}
return false
} }
// connection // connection

View File

@ -1,6 +1,7 @@
package collector_go package collector_go
import ( import (
"log"
"testing" "testing"
"time" "time"
) )
@ -8,12 +9,13 @@ import (
func TestCallGet(t *testing.T) { func TestCallGet(t *testing.T) {
c := Collector{} c := Collector{}
c.Start() c.Start()
time.Sleep(time.Second * 30) time.Sleep(time.Second * 30)
container := "" //log.Println("add sensor")
crawler := "" //c.AddSensor("network", "smb", "ttt")
id := "" //time.Sleep(time.Second * 3)
c.AddSensor(container, crawler, id)
time.Sleep(time.Second * 30) log.Println("update sonsor")
c.UpdateSensor("SOEJWEOJWOEJOSDJFOASDJFOSDFO2903870928734", "3")
time.Sleep(time.Second * 20)
} }

View File

@ -1,6 +1,7 @@
package cron package cron
import ( import (
"errors"
"reflect" "reflect"
"runtime" "runtime"
"sort" "sort"
@ -49,9 +50,9 @@ func (t *Task) run() {
t.addNextAt() t.addNextAt()
} }
func (t *Task) Invoke(TaskFunc interface{}, params ...interface{}) { func (t *Task) Invoke(TaskFunc interface{}, params ...interface{}) error {
if reflect.TypeOf(TaskFunc).Kind() != reflect.Func { if reflect.TypeOf(TaskFunc).Kind() != reflect.Func {
panic("Not a function ") return errors.New("Not a function type.")
} }
funcName := runtime.FuncForPC(reflect.ValueOf((TaskFunc)).Pointer()).Name() funcName := runtime.FuncForPC(reflect.ValueOf((TaskFunc)).Pointer()).Name()
@ -59,6 +60,8 @@ func (t *Task) Invoke(TaskFunc interface{}, params ...interface{}) {
t.funcParams[funcName] = params t.funcParams[funcName] = params
t.TaskFunc = funcName t.TaskFunc = funcName
t.addNextAt() t.addNextAt()
return nil
} }
func (t *Task) addNextAt() { func (t *Task) addNextAt() {
@ -68,8 +71,11 @@ func (t *Task) addNextAt() {
if t.period == 0 { if t.period == 0 {
t.period = time.Duration(t.intervalSec) t.period = time.Duration(t.intervalSec)
t.nextAt = t.lastAt.Add(1 * time.Second)
} else {
t.nextAt = t.lastAt.Add(t.period * time.Second)
} }
t.nextAt = t.lastAt.Add(t.period * time.Second)
} }
type Cron struct { type Cron struct {
@ -142,7 +148,7 @@ func (c *Cron) runAll() {
} }
} }
func (c *Cron) remove(id string) { func (c *Cron) remove(id string) error {
i := 0 i := 0
for ; i < c.size; i++ { for ; i < c.size; i++ {
@ -157,13 +163,16 @@ func (c *Cron) remove(id string) {
} }
c.size = c.size - 1 c.size = c.size - 1
return nil
} }
func (c *Cron) removeAll() { func (c *Cron) removeAll() error {
for i := 0; i < c.size; i++ { for i := 0; i < c.size; i++ {
c.Tasks[i] = nil c.Tasks[i] = nil
} }
c.size = 0 c.size = 0
return nil
} }
func (c *Cron) start() chan bool { func (c *Cron) start() chan bool {
@ -198,10 +207,10 @@ func UpdateTask(id string, intervalSec uint64) {
cron.updateTask(id, intervalSec) cron.updateTask(id, intervalSec)
} }
func RemoveAll() { func RemoveAll() error {
cron.removeAll() return cron.removeAll()
} }
func Remove(id string) { func Remove(id string) error {
cron.remove(id) return cron.remove(id)
} }

View File

@ -11,31 +11,32 @@ import (
const DEFAULT_INTERVAL = 5 const DEFAULT_INTERVAL = 5
type Scheduler struct { type Scheduler struct {
crawler *c.CrawlerImpl crawler *c.CrawlerImpl
once sync.Once once sync.Once
cronChan chan bool
} }
func (s *Scheduler) Init() { func (s *Scheduler) Init() {
s.once.Do(func() { s.once.Do(func() {
cron.Start() s.cronChan = cron.Start()
s.crawler = &c.CrawlerImpl{} s.crawler = &c.CrawlerImpl{}
}) })
} }
func (s *Scheduler) NewSchedule(id string, interval string, targetFunc interface{}) { func (s *Scheduler) Stop() {
i, err := strconv.Atoi(interval) s.cronChan <- false
if err != nil {
i = DEFAULT_INTERVAL
}
cron.AddTask(id, uint64(i-1)).Invoke(targetFunc, id)
} }
func (s *Scheduler) RemoveSchedule(id string) { func (s *Scheduler) NewSchedule(id, interval string, fn interface{}) error {
cron.Remove(id) return s.newSchedule(id, interval, fn)
} }
func (s *Scheduler) RemoveAllSchedule() { func (s *Scheduler) RemoveSchedule(id string) error {
cron.RemoveAll() return cron.Remove(id)
}
func (s *Scheduler) RemoveAllSchedule() error {
return cron.RemoveAll()
} }
func (s *Scheduler) UpdateSchedule(id string, interval string) { func (s *Scheduler) UpdateSchedule(id string, interval string) {
@ -46,6 +47,14 @@ func (s *Scheduler) UpdateSchedule(id string, interval string) {
cron.UpdateTask(id, uint64(i-1)) cron.UpdateTask(id, uint64(i-1))
} }
func (s *Scheduler) newSchedule(id string, interval string, fn interface{}) error {
i, err := strconv.Atoi(interval)
if err != nil {
i = DEFAULT_INTERVAL
}
return cron.AddTask(id, uint64(i-1)).Invoke(fn, id)
}
func (s *Scheduler) requestGet(id string) { func (s *Scheduler) requestGet(id string) {
data, err := s.crawler.Get(id) data, err := s.crawler.Get(id)
if err != nil { if err != nil {

View File

@ -1,35 +0,0 @@
package scheduler
import (
"fmt"
"strconv"
"testing"
"time"
)
func TestSchedul(t *testing.T) {
s := Scheduler{}
s.Init()
for i := 0; i < 10; i++ {
s.NewSchedule(strconv.Itoa(i), "3", test)
}
time.Sleep(time.Second * 10)
////update
fmt.Println("update")
for i := 0; i < 10; i++ {
s.UpdateSchedule(strconv.Itoa(i), "1")
}
time.Sleep(time.Second * 10)
//remove
fmt.Println("remove")
for i := 0; i < 9; i++ {
s.RemoveSchedule(strconv.Itoa(i))
}
time.Sleep(time.Second * 10)
}
func test() {
fmt.Println("test")
}