overflow_probes/commons/scheduler/Scheduler.go
crusader a3ad29ff73 ing
2018-03-27 00:14:19 +09:00

153 lines
2.7 KiB
Go

package scheduler
import (
"sort"
"time"
)
type Scheduler interface {
Len() int
Swap(i, j int)
Less(i, j int) bool
NextRun() (Job, time.Time)
Every(interval uint64) Job
RunPending()
RunAll()
RunAllwithDelay(d int)
Remove(j interface{})
Clear()
Start() chan bool
getRunnableJobs() (running_jobs [MAXJOBNUM]Job, n int)
}
// Create a new scheduler
func NewScheduler() Scheduler {
return &innerScheduler{[MAXJOBNUM]Job{}, 0}
}
type innerScheduler struct {
// Array store jobs
jobs [MAXJOBNUM]Job
// Size of jobs which jobs holding.
size int
}
func (s *innerScheduler) Len() int {
return s.size
}
func (s *innerScheduler) Swap(i, j int) {
s.jobs[i], s.jobs[j] = s.jobs[j], s.jobs[i]
}
func (s *innerScheduler) Less(i, j int) bool {
return s.jobs[j].nextRun().After(s.jobs[i].nextRun())
}
// Get the current runnable jobs, which shouldRun is True
func (s *innerScheduler) getRunnableJobs() (running_jobs [MAXJOBNUM]Job, n int) {
runnableJobs := [MAXJOBNUM]Job{}
n = 0
sort.Sort(s)
for i := 0; i < s.size; i++ {
if s.jobs[i].shouldRun() {
runnableJobs[n] = s.jobs[i]
//fmt.Println(runnableJobs)
n++
} else {
break
}
}
return runnableJobs, n
}
// Datetime when the next job should run.
func (s *innerScheduler) NextRun() (Job, time.Time) {
if s.size <= 0 {
return nil, time.Now()
}
sort.Sort(s)
return s.jobs[0], s.jobs[0].nextRun()
}
// Schedule a new periodic job
func (s *innerScheduler) Every(interval uint64) Job {
job := NewJob(interval)
s.jobs[s.size] = job
s.size++
return job
}
// Run all the jobs that are scheduled to run.
func (s *innerScheduler) RunPending() {
runnableJobs, n := s.getRunnableJobs()
if n != 0 {
for i := 0; i < n; i++ {
runnableJobs[i].run()
}
}
}
// Run all jobs regardless if they are scheduled to run or not
func (s *innerScheduler) RunAll() {
for i := 0; i < s.size; i++ {
s.jobs[i].run()
}
}
// Run all jobs with delay seconds
func (s *innerScheduler) RunAllwithDelay(d int) {
for i := 0; i < s.size; i++ {
s.jobs[i].run()
time.Sleep(time.Duration(d))
}
}
// Remove specific job j
func (s *innerScheduler) Remove(j interface{}) {
i := 0
for ; i < s.size; i++ {
if s.jobs[i].jobFunc() == getFunctionName(j) {
break
}
}
for j := (i + 1); j < s.size; j++ {
s.jobs[i] = s.jobs[j]
i++
}
s.size = s.size - 1
}
// Delete all scheduled jobs
func (s *innerScheduler) Clear() {
for i := 0; i < s.size; i++ {
s.jobs[i] = nil
}
s.size = 0
}
// Start all the pending jobs
// Add seconds ticker
func (s *innerScheduler) Start() chan bool {
stopped := make(chan bool, 1)
ticker := time.NewTicker(1 * time.Second)
go func() {
for {
select {
case <-ticker.C:
s.RunPending()
case <-stopped:
return
}
}
}()
return stopped
}