scheduler

This commit is contained in:
insanity@loafle.com 2017-04-29 16:17:52 +09:00
commit d34505dd62
3 changed files with 329 additions and 0 deletions

61
.gitignore vendored Normal file
View File

@ -0,0 +1,61 @@
# Created by .ignore support plugin (hsz.mobi)
### Go template
# Binaries for programs and plugins
*.exe
*.dll
*.so
*.dylib
# Test binary, build with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/
### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff:
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/dictionaries
# Sensitive or high-churn files:
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.xml
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
# Gradle:
.idea/**/gradle.xml
.idea/**/libraries
# Mongo Explorer plugin:
.idea/**/mongoSettings.xml
## File-based project format:
*.iws
## Plugin-specific files:
# IntelliJ
/out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties

227
scheduler.go Normal file
View File

@ -0,0 +1,227 @@
package scheduler
import (
"errors"
"reflect"
"runtime"
"sort"
"time"
)
const (
MAX_TASKS = 10000
)
type Task struct {
id string
intervalSec uint64
TaskFunc string
period time.Duration
taskFunc map[string]interface{}
funcParams map[string]([]interface{})
lastAt time.Time
nextAt time.Time
}
func NewTask(id string, intervel uint64) *Task {
return &Task{
id,
intervel,
"",
0,
make(map[string]interface{}),
make(map[string]([]interface{})),
time.Unix(0, 0),
time.Unix(0, 0),
}
}
func (t *Task) runnable() bool {
return time.Now().After(t.nextAt)
}
func (t *Task) run() {
taskFunc := reflect.ValueOf(t.taskFunc[t.TaskFunc])
taskParams := t.funcParams[t.TaskFunc]
in := make([]reflect.Value, len(taskParams))
for i, param := range taskParams {
in[i] = reflect.ValueOf(param)
}
taskFunc.Call(in)
t.lastAt = time.Now()
t.addNextAt()
}
func (t *Task) invoke(TaskFunc interface{}, params ...interface{}) error {
if reflect.TypeOf(TaskFunc).Kind() != reflect.Func {
return errors.New("Not a function type.")
}
funcName := runtime.FuncForPC(reflect.ValueOf((TaskFunc)).Pointer()).Name()
t.taskFunc[funcName] = TaskFunc
t.funcParams[funcName] = params
t.TaskFunc = funcName
t.addNextAt()
return nil
}
func (t *Task) addNextAt() {
if t.lastAt == time.Unix(0, 0) {
t.lastAt = time.Now()
}
if t.period == 0 {
t.period = time.Duration(t.intervalSec)
t.nextAt = t.lastAt.Add(1 * time.Second)
} else {
t.nextAt = t.lastAt.Add((t.period * time.Second) - time.Second)
}
}
type Scheduler struct {
Tasks [MAX_TASKS]*Task
size int
}
func (s *Scheduler) Len() int {
return s.size
}
func (s *Scheduler) Swap(i, j int) {
s.Tasks[i], s.Tasks[j] = s.Tasks[j], s.Tasks[i]
}
func (s *Scheduler) Less(i, j int) bool {
return s.Tasks[j].nextAt.After(s.Tasks[i].nextAt)
}
func (s *Scheduler) RunnableTasks() ([MAX_TASKS]*Task, int) {
runnableTasks := [MAX_TASKS]*Task{}
n := 0
sort.Sort(s)
for i := 0; i < s.size; i++ {
if s.Tasks[i].runnable() {
runnableTasks[n] = s.Tasks[i]
n++
} else {
break
}
}
return runnableTasks, n
}
func (s *Scheduler) nextRun() (*Task, time.Time) {
if s.size <= 0 {
return nil, time.Now()
}
sort.Sort(s)
return s.Tasks[0], s.Tasks[0].nextAt
}
func (s *Scheduler) addTask(id string, intervalSec uint64) *Task {
s.remove(id)
Task := NewTask(id, intervalSec)
s.Tasks[s.size] = Task
s.size++
return Task
}
func (s *Scheduler) runAll() {
runnableTasks, n := s.RunnableTasks()
if n != 0 {
for i := 0; i < n; i++ {
go runnableTasks[i].run()
}
}
}
func (s *Scheduler) remove(id string) error {
i := 0
var ex bool
for ; i < s.size; i++ {
if s.Tasks[i].id == id {
ex = true
break
}
}
if !ex {
return nil
}
for j := (i + 1); j < s.size; j++ {
s.Tasks[i] = s.Tasks[j]
i++
}
s.size = s.size - 1
return nil
}
func (s *Scheduler) Start() chan bool {
stopped := make(chan bool, 1)
ticker := time.NewTicker(time.Second * 1)
go func() {
for {
select {
case <-ticker.C:
s.runAll()
case <-stopped:
return
}
}
}()
return stopped
}
func (s *Scheduler) NewSchedule(id string, interval uint64, fn interface{}) error {
return s.addTask(id, interval).invoke(fn, id)
}
func (s *Scheduler) RemoveSchedule(id string) error {
i := 0
var ex bool
for ; i < s.size; i++ {
if s.Tasks[i].id == id {
ex = true
break
}
}
if !ex {
return nil
}
for j := (i + 1); j < s.size; j++ {
s.Tasks[i] = s.Tasks[j]
i++
}
s.size = s.size - 1
return nil
}
func (s *Scheduler) RemoveAllSchedule() error {
for i := 0; i < s.size; i++ {
s.Tasks[i] = nil
}
s.size = 0
return nil
}
func (s *Scheduler) UpdateInterval(id string, interval uint64) {
for i := 0; i < s.Len(); i++ {
if s.Tasks[i].id == id {
s.Tasks[i].intervalSec = interval
s.Tasks[i].period = 0
}
}
}

41
scheduler_test.go Normal file
View File

@ -0,0 +1,41 @@
package scheduler
import (
"testing"
"log"
"time"
)
func TestScheduler(t *testing.T) {
s := Scheduler{}
s.Start()
s.NewSchedule("1111", 3, callback)
time.Sleep(time.Second * 10)
s.RemoveSchedule("1111")
time.Sleep(time.Second * 100)
}
func TestMassiveSchedule(t *testing.T) {
s := Scheduler{}
s.Start()
for i := 0; i < 9999; i++ {
s.NewSchedule(string(i), 5, test)
}
s.NewSchedule("#######################", 5, test)
time.Sleep(time.Second * 10)
s.NewSchedule("#######################", 1, test)
time.Sleep(time.Second * 100)
}
func callback(id string) {
log.Println(id)
}
func test(id string) {
if id == "#######################" {
log.Println(id)
}
}