collector-scheduler

This commit is contained in:
insanity@loafle.com 2017-04-14 19:09:51 +09:00
parent f2a577f98c
commit e1a0fe961a
6 changed files with 398 additions and 37 deletions

2
.gitignore vendored
View File

@ -6,7 +6,7 @@
*.so *.so
*.dylib *.dylib
# Test binary, build with `go test -c` # Test binary, build with `go ins -c`
*.test *.test
# Output of the go coverage tool, specifically when used with LiteIDE # Output of the go coverage tool, specifically when used with LiteIDE

View File

@ -1,41 +1,97 @@
package collector_go package collector_go
import ( import (
g "loafle.com/overflow/crawler_go/grpc" sm "loafle.com/overflow/collector_go/scheduler"
"google.golang.org/grpc"
"log" "log"
"context"
) )
type scheduleInfo struct {
sensorId string
interval string
}
type schedules []*scheduleInfo
const ( type Collector struct {
address = "192.168.1.105:50052" scheduler sm.Scheduler
) }
func CallGet() { func (c *Collector) Start() {
result := c.genSchedules()
conn, err := grpc.Dial(address, grpc.WithInsecure()) if len(result) <= 0 {
if err != nil { return
log.Fatalf("did not connect: %v", err)
} }
defer conn.Close() c.scheduler = sm.Scheduler{}
c.scheduler.Init()
for i := 0; i < len(result); i++ {
dc := g.NewDataClient(conn); r := result[i]
c.scheduler.NewSchedule(r.sensorId, r.interval, c.collect)
in := &g.Input{}
in.Id = ""
in.Name = g.Crawlers_HEALTH_DNS
out, err := dc.Get(context.Background(), in)
if err != nil {
log.Println(err)
} }
log.Println(out) }
} func (c *Collector) Stop() {
c.scheduler.RemoveAllSchedule()
}
func (c *Collector) AddSensor(container, crawler, id string) {
s := c.genSchedule(container, crawler, id)
c.scheduler.NewSchedule(s.sensorId, s.interval, c.collect)
}
func (c *Collector) RemoveSensor() {
}
func (c *Collector) collect(id string) {
log.Println("collect ", id)
}
func (c *Collector) genSchedules() schedules {
ss := make([]*scheduleInfo, 0)
s1 := &scheduleInfo{
sensorId: "aa",
interval: "3",
}
s2 := &scheduleInfo{
sensorId: "bb",
interval: "5",
}
ss = append(ss, s1)
ss = append(ss, s2)
return ss
}
func (c *Collector) genSchedule(container, crawler, id string) *scheduleInfo {
s := &scheduleInfo{
sensorId: "cc",
interval: "5",
}
return s
}
// connection
//func CallGet() {
//
// conn, err := grpc.Dial(address, grpc.WithInsecure())
// if err != nil {
// log.Fatalf("did not connect: %v", err)
// }
// defer conn.Close()
//
//
// dc := g.NewDataClient(conn);
//
// in := &g.Input{}
//
// in.Id = ""
// in.Name = g.Crawlers_HEALTH_DNS
//
//
//
// out, err := dc.Get(context.Background(), in) ////
//
// if err != nil {
// log.Println(err)
// }
// log.Println(out)
//
//}

View File

@ -1,12 +1,19 @@
package collector_go package collector_go
import "testing" import (
"testing"
"time"
)
func TestCallGet(t *testing.T) { func TestCallGet(t *testing.T) {
c := Collector{}
c.Start()
CallGet() time.Sleep(time.Second * 30)
container := ""
crawler := ""
id := ""
} c.AddSensor(container, crawler, id)
time.Sleep(time.Second * 30)
}

207
scheduler/cron/cron.go Normal file
View File

@ -0,0 +1,207 @@
package cron
import (
"reflect"
"runtime"
"sort"
"time"
)
const MAX_TASKS = 10000
type Task struct {
id string
intervalSec uint64
TaskFunc string
period time.Duration
taskFunc map[string]interface{}
funcParams map[string]([]interface{})
lastAt time.Time
nextAt time.Time
}
func NewTask(id string, intervel uint64) *Task {
return &Task{
id,
intervel,
"",
0,
make(map[string]interface{}),
make(map[string]([]interface{})),
time.Unix(0, 0),
time.Unix(0, 0),
}
}
func (t *Task) runnable() bool {
return time.Now().After(t.nextAt)
}
func (t *Task) run() {
taskFunc := reflect.ValueOf(t.taskFunc[t.TaskFunc])
taskParams := t.funcParams[t.TaskFunc]
in := make([]reflect.Value, len(taskParams))
for i, param := range taskParams {
in[i] = reflect.ValueOf(param)
}
taskFunc.Call(in)
t.lastAt = time.Now()
t.addNextAt()
}
func (t *Task) Invoke(TaskFunc interface{}, params ...interface{}) {
if reflect.TypeOf(TaskFunc).Kind() != reflect.Func {
panic("Not a function ")
}
funcName := runtime.FuncForPC(reflect.ValueOf((TaskFunc)).Pointer()).Name()
t.taskFunc[funcName] = TaskFunc
t.funcParams[funcName] = params
t.TaskFunc = funcName
t.addNextAt()
}
func (t *Task) addNextAt() {
if t.lastAt == time.Unix(0, 0) {
t.lastAt = time.Now()
}
if t.period == 0 {
t.period = time.Duration(t.intervalSec)
}
t.nextAt = t.lastAt.Add(t.period * time.Second)
}
type Cron struct {
Tasks [MAX_TASKS]*Task
size int
}
func (c *Cron) Len() int {
return c.size
}
func (c *Cron) Swap(i, j int) {
c.Tasks[i], c.Tasks[j] = c.Tasks[j], c.Tasks[i]
}
func (c *Cron) Less(i, j int) bool {
return c.Tasks[j].nextAt.After(c.Tasks[i].nextAt)
}
func NewCron() *Cron {
return &Cron{[MAX_TASKS]*Task{}, 0}
}
func (c *Cron) RunnableTasks() ([MAX_TASKS]*Task, int) {
runnableTasks := [MAX_TASKS]*Task{}
n := 0
sort.Sort(c)
for i := 0; i < c.size; i++ {
if c.Tasks[i].runnable() {
runnableTasks[n] = c.Tasks[i]
n++
} else {
break
}
}
return runnableTasks, n
}
func (c *Cron) nextRun() (*Task, time.Time) {
if c.size <= 0 {
return nil, time.Now()
}
sort.Sort(c)
return c.Tasks[0], c.Tasks[0].nextAt
}
func (c *Cron) addTask(id string, intervalSec uint64) *Task {
Task := NewTask(id, intervalSec)
c.Tasks[c.size] = Task
c.size++
return Task
}
func (c *Cron) updateTask(id string, intervalSec uint64) {
for i := 0; i < c.Len(); i++ {
if c.Tasks[i].id == id {
c.Tasks[i].intervalSec = intervalSec
c.Tasks[i].period = 0
}
}
}
func (c *Cron) runAll() {
runnableTasks, n := c.RunnableTasks()
if n != 0 {
for i := 0; i < n; i++ {
go runnableTasks[i].run()
}
}
}
func (c *Cron) remove(id string) {
i := 0
for ; i < c.size; i++ {
if c.Tasks[i].id == id {
break
}
}
for j := (i + 1); j < c.size; j++ {
c.Tasks[i] = c.Tasks[j]
i++
}
c.size = c.size - 1
}
func (c *Cron) removeAll() {
for i := 0; i < c.size; i++ {
c.Tasks[i] = nil
}
c.size = 0
}
func (c *Cron) start() chan bool {
stopped := make(chan bool, 1)
ticker := time.NewTicker(1 * time.Second)
go func() {
for {
select {
case <-ticker.C:
c.runAll()
case <-stopped:
return
}
}
}()
return stopped
}
var cron = NewCron()
func Start() chan bool {
return cron.start()
}
func AddTask(id string, intervalSec uint64) *Task {
return cron.addTask(id, intervalSec)
}
func UpdateTask(id string, intervalSec uint64) {
cron.updateTask(id, intervalSec)
}
func RemoveAll() {
cron.removeAll()
}
func Remove(id string) {
cron.remove(id)
}

56
scheduler/scheduler.go Normal file
View File

@ -0,0 +1,56 @@
package scheduler
import (
"loafle.com/overflow/collector_go/scheduler/cron"
c "loafle.com/overflow/crawler_go"
"log"
"strconv"
"sync"
)
const DEFAULT_INTERVAL = 5
type Scheduler struct {
crawler *c.CrawlerImpl
once sync.Once
}
func (s *Scheduler) Init() {
s.once.Do(func() {
cron.Start()
s.crawler = &c.CrawlerImpl{}
})
}
func (s *Scheduler) NewSchedule(id string, interval string, targetFunc interface{}) {
i, err := strconv.Atoi(interval)
if err != nil {
i = DEFAULT_INTERVAL
}
cron.AddTask(id, uint64(i-1)).Invoke(targetFunc, id)
}
func (s *Scheduler) RemoveSchedule(id string) {
cron.Remove(id)
}
func (s *Scheduler) RemoveAllSchedule() {
cron.RemoveAll()
}
func (s *Scheduler) UpdateSchedule(id string, interval string) {
i, err := strconv.Atoi(interval)
if err != nil {
i = DEFAULT_INTERVAL
}
cron.UpdateTask(id, uint64(i-1))
}
func (s *Scheduler) requestGet(id string) {
data, err := s.crawler.Get(id)
if err != nil {
log.Printf("[ID: %s] An error has occurred. %s", id, err.Error())
return
}
log.Println(data)
}

View File

@ -0,0 +1,35 @@
package scheduler
import (
"fmt"
"strconv"
"testing"
"time"
)
func TestSchedul(t *testing.T) {
s := Scheduler{}
s.Init()
for i := 0; i < 10; i++ {
s.NewSchedule(strconv.Itoa(i), "3", test)
}
time.Sleep(time.Second * 10)
////update
fmt.Println("update")
for i := 0; i < 10; i++ {
s.UpdateSchedule(strconv.Itoa(i), "1")
}
time.Sleep(time.Second * 10)
//remove
fmt.Println("remove")
for i := 0; i < 9; i++ {
s.RemoveSchedule(strconv.Itoa(i))
}
time.Sleep(time.Second * 10)
}
func test() {
fmt.Println("test")
}