collector

This commit is contained in:
insanity@loafle.com 2017-04-25 12:14:15 +09:00
parent 7b4cf8f2b9
commit 0876129a3a
4 changed files with 27 additions and 223 deletions

View File

@ -1,20 +1,20 @@
package collector_go
import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
s "loafle.com/overflow/collector_go/scheduler"
conf "loafle.com/overflow/crawler_go/config"
g "loafle.com/overflow/crawler_go/grpc"
crm "loafle.com/overflow/crawler_manager_go"
"log"
"os"
"path/filepath"
"strings"
"time"
crm "loafle.com/overflow/crawler_manager_go"
"context"
g "loafle.com/overflow/crawler_go/grpc"
)
const CONFIG_ROOT = "/config/container"
@ -56,12 +56,12 @@ func (c *Collector) collect(id string) {
}
defer conn.Close()
dc := g.NewDataClient(conn);
dc := g.NewDataClient(conn)
in := &g.Input{}
in.Id = id
in.Name = g.Crawlers(g.Crawlers_value[conf.Crawler.Name])
out, err := dc.Get(context.Background(), in)
if err != nil {
@ -82,7 +82,7 @@ func (c *Collector) collect(id string) {
func (c *Collector) AddSensor(conf *conf.Config) {
if c.checkExist(conf.Id) {
log.Println("he Same Id already exists.")
log.Println("The Same Id already exists.")
return
}
c.configs[conf.Id] = conf

View File

@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
conf "loafle.com/overflow/crawler_go/config"
"loafle.com/overflow/cron_go"
"log"
"testing"
"time"
@ -36,3 +37,22 @@ func newConf() *conf.Config {
return &conf
}
func TestPooling(t *testing.T) {
c := &cron.Cron{}
c.Start()
c.AddTask("polling", 3).Invoke(handlePolling)
time.Sleep(time.Second * 10)
fmt.Println("UPDATE")
c.UpdateTask("polling", 1)
time.Sleep(time.Second * 10)
fmt.Println("UPDATE")
c.UpdateTask("polling", 3)
time.Sleep(time.Second * 10)
}
func handlePolling() {
fmt.Println(time.Now(), " polling")
}

View File

@ -1,216 +0,0 @@
package cron
import (
"errors"
"reflect"
"runtime"
"sort"
"time"
)
const MAX_TASKS = 10000
type Task struct {
id string
intervalSec uint64
TaskFunc string
period time.Duration
taskFunc map[string]interface{}
funcParams map[string]([]interface{})
lastAt time.Time
nextAt time.Time
}
func NewTask(id string, intervel uint64) *Task {
return &Task{
id,
intervel,
"",
0,
make(map[string]interface{}),
make(map[string]([]interface{})),
time.Unix(0, 0),
time.Unix(0, 0),
}
}
func (t *Task) runnable() bool {
return time.Now().After(t.nextAt)
}
func (t *Task) run() {
taskFunc := reflect.ValueOf(t.taskFunc[t.TaskFunc])
taskParams := t.funcParams[t.TaskFunc]
in := make([]reflect.Value, len(taskParams))
for i, param := range taskParams {
in[i] = reflect.ValueOf(param)
}
taskFunc.Call(in)
t.lastAt = time.Now()
t.addNextAt()
}
func (t *Task) Invoke(TaskFunc interface{}, params ...interface{}) error {
if reflect.TypeOf(TaskFunc).Kind() != reflect.Func {
return errors.New("Not a function type.")
}
funcName := runtime.FuncForPC(reflect.ValueOf((TaskFunc)).Pointer()).Name()
t.taskFunc[funcName] = TaskFunc
t.funcParams[funcName] = params
t.TaskFunc = funcName
t.addNextAt()
return nil
}
func (t *Task) addNextAt() {
if t.lastAt == time.Unix(0, 0) {
t.lastAt = time.Now()
}
if t.period == 0 {
t.period = time.Duration(t.intervalSec)
t.nextAt = t.lastAt.Add(1 * time.Second)
} else {
t.nextAt = t.lastAt.Add((t.period * time.Second) - time.Second)
}
}
type Cron struct {
Tasks [MAX_TASKS]*Task
size int
}
func (c *Cron) Len() int {
return c.size
}
func (c *Cron) Swap(i, j int) {
c.Tasks[i], c.Tasks[j] = c.Tasks[j], c.Tasks[i]
}
func (c *Cron) Less(i, j int) bool {
return c.Tasks[j].nextAt.After(c.Tasks[i].nextAt)
}
func NewCron() *Cron {
return &Cron{[MAX_TASKS]*Task{}, 0}
}
func (c *Cron) RunnableTasks() ([MAX_TASKS]*Task, int) {
runnableTasks := [MAX_TASKS]*Task{}
n := 0
sort.Sort(c)
for i := 0; i < c.size; i++ {
if c.Tasks[i].runnable() {
runnableTasks[n] = c.Tasks[i]
n++
} else {
break
}
}
return runnableTasks, n
}
func (c *Cron) nextRun() (*Task, time.Time) {
if c.size <= 0 {
return nil, time.Now()
}
sort.Sort(c)
return c.Tasks[0], c.Tasks[0].nextAt
}
func (c *Cron) addTask(id string, intervalSec uint64) *Task {
Task := NewTask(id, intervalSec)
c.Tasks[c.size] = Task
c.size++
return Task
}
func (c *Cron) updateTask(id string, intervalSec uint64) {
for i := 0; i < c.Len(); i++ {
if c.Tasks[i].id == id {
c.Tasks[i].intervalSec = intervalSec
c.Tasks[i].period = 0
}
}
}
func (c *Cron) runAll() {
runnableTasks, n := c.RunnableTasks()
if n != 0 {
for i := 0; i < n; i++ {
go runnableTasks[i].run()
}
}
}
func (c *Cron) remove(id string) error {
i := 0
for ; i < c.size; i++ {
if c.Tasks[i].id == id {
break
}
}
for j := (i + 1); j < c.size; j++ {
c.Tasks[i] = c.Tasks[j]
i++
}
c.size = c.size - 1
return nil
}
func (c *Cron) removeAll() error {
for i := 0; i < c.size; i++ {
c.Tasks[i] = nil
}
c.size = 0
return nil
}
func (c *Cron) start() chan bool {
stopped := make(chan bool, 1)
ticker := time.NewTicker(time.Second * 1)
go func() {
for {
select {
case <-ticker.C:
c.runAll()
case <-stopped:
return
}
}
}()
return stopped
}
var cron = NewCron()
func Start() chan bool {
return cron.start()
}
func AddTask(id string, intervalSec uint64) *Task {
return cron.addTask(id, intervalSec)
}
func UpdateTask(id string, intervalSec uint64) {
cron.updateTask(id, intervalSec)
}
func RemoveAll() error {
return cron.removeAll()
}
func Remove(id string) error {
return cron.remove(id)
}

View File

@ -1,8 +1,8 @@
package scheduler
import (
"loafle.com/overflow/collector_go/scheduler/cron"
c "loafle.com/overflow/crawler_go"
"loafle.com/overflow/cron_go"
"log"
"strconv"
"sync"