util-go/time/scheduler/scheduler.go

218 lines
5.5 KiB
Go
Raw Permalink Normal View History

2018-04-28 12:39:55 +00:00
package scheduler
import (
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
2018-04-28 13:17:13 +00:00
"git.loafle.net/commons/util-go/time/scheduler/storage"
"git.loafle.net/commons/util-go/time/scheduler/task"
2018-04-28 12:39:55 +00:00
)
type Scheduler struct {
taskTargetRegistry *task.TaskTargetRegistry
tasks sync.Map
adapter *storageAdapter
stopChan chan bool
}
// New will return a new instance of the Scheduler struct.
func New(taskStorage storage.TaskStorage) *Scheduler {
taskTargetRegistry := &task.TaskTargetRegistry{}
return &Scheduler{
taskTargetRegistry: taskTargetRegistry,
stopChan: make(chan bool),
adapter: &storageAdapter{
taskStorage: taskStorage,
taskTargetRegistry: taskTargetRegistry,
},
}
}
// RunAt will schedule function to be executed once at the given time.
func (s *Scheduler) RunAt(time time.Time, targetFunc interface{}, params ...interface{}) (string, error) {
taskTarget, err := s.taskTargetRegistry.Add(targetFunc)
if err != nil {
return "", err
}
task := task.New(taskTarget, params)
task.NextRun = time
s.registerTask(task)
return task.Hash(), nil
}
// RunAfter executes function once after a specific duration has elapsed.
func (s *Scheduler) RunAfter(duration time.Duration, targetFunc interface{}, params ...interface{}) (string, error) {
return s.RunAt(time.Now().Add(duration), targetFunc, params...)
}
// RunEvery will schedule function to be executed every time the duration has elapsed.
func (s *Scheduler) RunEvery(duration time.Duration, targetFunc interface{}, params ...interface{}) (string, error) {
taskTarget, err := s.taskTargetRegistry.Add(targetFunc)
if err != nil {
return "", err
}
task := task.New(taskTarget, params)
task.IsRecurring = true
task.Duration = duration
task.NextRun = time.Now().Add(duration)
s.registerTask(task)
return task.Hash(), nil
}
// Start will run the scheduler's timer and will trigger the execution
// of tasks depending on their schedule.
func (s *Scheduler) Start() error {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Populate tasks from storage
if err := s.populateTasks(); err != nil {
return err
}
if err := s.persistRegisteredTasks(); err != nil {
return err
}
s.runPending()
go func() {
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
s.runPending()
case <-sigChan:
s.stopChan <- true
case <-s.stopChan:
close(s.stopChan)
}
}
}()
return nil
}
// Stop will put the scheduler to halt
func (s *Scheduler) Stop() {
s.stopChan <- true
}
// Wait is a convenience function for blocking until the scheduler is stopped.
func (s *Scheduler) Wait() {
<-s.stopChan
}
// Cancel is used to cancel the planned execution of a specific task using it's ID.
// The ID is returned when the task was scheduled using RunAt, RunAfter or RunEvery
func (s *Scheduler) Cancel(taskID string) error {
t, ok := s.tasks.Load(taskID)
if !ok {
return fmt.Errorf("Task not found")
}
s.adapter.Remove(t.(*task.Task))
s.tasks.Delete(taskID)
return nil
}
// Clear will cancel the execution and clear all registered tasks.
func (s *Scheduler) Clear() {
s.tasks.Range(func(_taskID, _task interface{}) bool {
s.adapter.Remove(_task.(*task.Task))
s.tasks.Delete(_taskID.(string))
return true
})
s.taskTargetRegistry = &task.TaskTargetRegistry{}
}
func (s *Scheduler) populateTasks() error {
tasks, err := s.adapter.Fetch()
if err != nil {
return err
}
for _, _task := range tasks {
// If we can't find the function, it's been changed/removed by user
exists := s.taskTargetRegistry.Exists(_task.TaskTarget.Name)
if !exists {
log.Printf("%s was not found, it will be removed\n", _task.TaskTarget.Name)
_ = s.adapter.Remove(_task)
continue
}
// If the task instance is still registered with the same computed hash then move on.
// Otherwise, one of the attributes changed and therefore, the task instance should
// be added to the list of tasks to be executed with the stored params
_eTask, ok := s.tasks.Load(_task.Hash())
if !ok {
log.Printf("Detected a change in attributes of one of the instances of task %s, \n",
_task.TaskTarget.Name)
_task.TaskTarget, _ = s.taskTargetRegistry.Get(_task.TaskTarget.Name)
_eTask = _task
s.tasks.Store(_task.Hash(), _eTask)
}
eTask := _eTask.(*task.Task)
// Skip task which is not a recurring one and the NextRun has already passed
if !_task.IsRecurring && _task.NextRun.Before(time.Now()) {
// We might have a task instance which was executed already.
// In this case, delete it.
_ = s.adapter.Remove(_task)
s.tasks.Delete(_task.Hash())
continue
}
// Duration may have changed for recurring tasks
if _task.IsRecurring && eTask.Duration != _task.Duration {
// Reschedule NextRun based on _task.LastRun + registeredTask.Duration
eTask.NextRun = _task.LastRun.Add(eTask.Duration)
}
}
return nil
}
func (s *Scheduler) persistRegisteredTasks() error {
var err error
s.tasks.Range(func(_taskID, _task interface{}) bool {
err = s.adapter.Add(_task.(*task.Task))
if nil != err {
return false
}
return true
})
return err
}
func (s *Scheduler) runPending() {
s.tasks.Range(func(_taskID, _task interface{}) bool {
t := _task.(*task.Task)
if t.IsDue() {
go t.Run()
if !t.IsRecurring {
s.adapter.Remove(t)
s.tasks.Delete(t.Hash())
}
}
return true
})
}
func (s *Scheduler) registerTask(task *task.Task) {
s.taskTargetRegistry.Add(task.TaskTarget)
s.tasks.Store(task.Hash(), task)
}