This commit is contained in:
crusader 2018-04-28 21:39:55 +09:00
parent 595f56aa7c
commit 6b4beb77fb
7 changed files with 612 additions and 0 deletions

217
time/scheduler/scheduler.go Normal file
View File

@ -0,0 +1,217 @@
package scheduler
import (
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"git.loafle.net/overflow/probe/service/scheduler/storage"
"git.loafle.net/overflow/probe/service/scheduler/task"
)
type Scheduler struct {
taskTargetRegistry *task.TaskTargetRegistry
tasks sync.Map
adapter *storageAdapter
stopChan chan bool
}
// New will return a new instance of the Scheduler struct.
func New(taskStorage storage.TaskStorage) *Scheduler {
taskTargetRegistry := &task.TaskTargetRegistry{}
return &Scheduler{
taskTargetRegistry: taskTargetRegistry,
stopChan: make(chan bool),
adapter: &storageAdapter{
taskStorage: taskStorage,
taskTargetRegistry: taskTargetRegistry,
},
}
}
// RunAt will schedule function to be executed once at the given time.
func (s *Scheduler) RunAt(time time.Time, targetFunc interface{}, params ...interface{}) (string, error) {
taskTarget, err := s.taskTargetRegistry.Add(targetFunc)
if err != nil {
return "", err
}
task := task.New(taskTarget, params)
task.NextRun = time
s.registerTask(task)
return task.Hash(), nil
}
// RunAfter executes function once after a specific duration has elapsed.
func (s *Scheduler) RunAfter(duration time.Duration, targetFunc interface{}, params ...interface{}) (string, error) {
return s.RunAt(time.Now().Add(duration), targetFunc, params...)
}
// RunEvery will schedule function to be executed every time the duration has elapsed.
func (s *Scheduler) RunEvery(duration time.Duration, targetFunc interface{}, params ...interface{}) (string, error) {
taskTarget, err := s.taskTargetRegistry.Add(targetFunc)
if err != nil {
return "", err
}
task := task.New(taskTarget, params)
task.IsRecurring = true
task.Duration = duration
task.NextRun = time.Now().Add(duration)
s.registerTask(task)
return task.Hash(), nil
}
// Start will run the scheduler's timer and will trigger the execution
// of tasks depending on their schedule.
func (s *Scheduler) Start() error {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Populate tasks from storage
if err := s.populateTasks(); err != nil {
return err
}
if err := s.persistRegisteredTasks(); err != nil {
return err
}
s.runPending()
go func() {
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
s.runPending()
case <-sigChan:
s.stopChan <- true
case <-s.stopChan:
close(s.stopChan)
}
}
}()
return nil
}
// Stop will put the scheduler to halt
func (s *Scheduler) Stop() {
s.stopChan <- true
}
// Wait is a convenience function for blocking until the scheduler is stopped.
func (s *Scheduler) Wait() {
<-s.stopChan
}
// Cancel is used to cancel the planned execution of a specific task using it's ID.
// The ID is returned when the task was scheduled using RunAt, RunAfter or RunEvery
func (s *Scheduler) Cancel(taskID string) error {
t, ok := s.tasks.Load(taskID)
if !ok {
return fmt.Errorf("Task not found")
}
s.adapter.Remove(t.(*task.Task))
s.tasks.Delete(taskID)
return nil
}
// Clear will cancel the execution and clear all registered tasks.
func (s *Scheduler) Clear() {
s.tasks.Range(func(_taskID, _task interface{}) bool {
s.adapter.Remove(_task.(*task.Task))
s.tasks.Delete(_taskID.(string))
return true
})
s.taskTargetRegistry = &task.TaskTargetRegistry{}
}
func (s *Scheduler) populateTasks() error {
tasks, err := s.adapter.Fetch()
if err != nil {
return err
}
for _, _task := range tasks {
// If we can't find the function, it's been changed/removed by user
exists := s.taskTargetRegistry.Exists(_task.TaskTarget.Name)
if !exists {
log.Printf("%s was not found, it will be removed\n", _task.TaskTarget.Name)
_ = s.adapter.Remove(_task)
continue
}
// If the task instance is still registered with the same computed hash then move on.
// Otherwise, one of the attributes changed and therefore, the task instance should
// be added to the list of tasks to be executed with the stored params
_eTask, ok := s.tasks.Load(_task.Hash())
if !ok {
log.Printf("Detected a change in attributes of one of the instances of task %s, \n",
_task.TaskTarget.Name)
_task.TaskTarget, _ = s.taskTargetRegistry.Get(_task.TaskTarget.Name)
_eTask = _task
s.tasks.Store(_task.Hash(), _eTask)
}
eTask := _eTask.(*task.Task)
// Skip task which is not a recurring one and the NextRun has already passed
if !_task.IsRecurring && _task.NextRun.Before(time.Now()) {
// We might have a task instance which was executed already.
// In this case, delete it.
_ = s.adapter.Remove(_task)
s.tasks.Delete(_task.Hash())
continue
}
// Duration may have changed for recurring tasks
if _task.IsRecurring && eTask.Duration != _task.Duration {
// Reschedule NextRun based on _task.LastRun + registeredTask.Duration
eTask.NextRun = _task.LastRun.Add(eTask.Duration)
}
}
return nil
}
func (s *Scheduler) persistRegisteredTasks() error {
var err error
s.tasks.Range(func(_taskID, _task interface{}) bool {
err = s.adapter.Add(_task.(*task.Task))
if nil != err {
return false
}
return true
})
return err
}
func (s *Scheduler) runPending() {
s.tasks.Range(func(_taskID, _task interface{}) bool {
t := _task.(*task.Task)
if t.IsDue() {
go t.Run()
if !t.IsRecurring {
s.adapter.Remove(t)
s.tasks.Delete(t.Hash())
}
}
return true
})
}
func (s *Scheduler) registerTask(task *task.Task) {
s.taskTargetRegistry.Add(task.TaskTarget)
s.tasks.Store(task.Hash(), task)
}

View File

@ -0,0 +1,41 @@
package scheduler_test
import (
"log"
"testing"
"time"
"git.loafle.net/overflow/probe/service/scheduler"
"git.loafle.net/overflow/probe/service/scheduler/storage"
)
func TestDiscoverPort(t *testing.T) {
_storage := storage.NewMemoryStorage()
s := scheduler.New(_storage)
// Start a task with arguments
tID1, err := s.RunEvery(1*time.Second, TaskWithArgs, "Hello from recurring task 1")
if err != nil {
log.Fatal(err)
}
// Start the same task as above with a different argument
_, err = s.RunEvery(4*time.Second, TaskWithArgs, "Hello from recurring task 2")
if err != nil {
log.Fatal(err)
}
s.Start()
time.Sleep(4 * time.Second)
s.Cancel(tID1)
s.Wait()
}
func TaskWithoutArgs() {
log.Println("TaskWithoutArgs is executed")
}
func TaskWithArgs(message string) {
log.Println("TaskWithArgs is executed. message:", message)
}

141
time/scheduler/storage.go Normal file
View File

@ -0,0 +1,141 @@
package scheduler
import (
"encoding/json"
"reflect"
"strconv"
"strings"
"time"
"git.loafle.net/overflow/probe/service/scheduler/storage"
"git.loafle.net/overflow/probe/service/scheduler/task"
)
type storageAdapter struct {
taskStorage storage.TaskStorage
taskTargetRegistry *task.TaskTargetRegistry
}
func (a *storageAdapter) Add(task *task.Task) error {
ta, err := a.getTaskAttribute(task)
if err != nil {
return err
}
return a.taskStorage.Add(ta)
}
func (a *storageAdapter) Fetch() ([]*task.Task, error) {
tas, err := a.taskStorage.Fetch()
if err != nil {
return nil, err
}
var tasks []*task.Task
for _, ta := range tas {
lastRun, err := time.Parse(time.RFC3339, ta.LastRun)
if err != nil {
return nil, err
}
nextRun, err := time.Parse(time.RFC3339, ta.NextRun)
if err != nil {
return nil, err
}
duration, err := time.ParseDuration(ta.Duration)
if err != nil {
return nil, err
}
isRecurring, err := strconv.Atoi(ta.IsRecurring)
if err != nil {
return nil, err
}
taskTarget, err := a.taskTargetRegistry.Get(ta.Name)
if err != nil {
return nil, err
}
params, err := paramsFromString(taskTarget, ta.Params)
if err != nil {
return nil, err
}
t := task.NewWithSchedule(taskTarget, params, &task.TaskSchedule{
IsRecurring: isRecurring == 1,
Duration: time.Duration(duration),
LastRun: lastRun,
NextRun: nextRun,
})
tasks = append(tasks, t)
}
return tasks, nil
}
func (a *storageAdapter) Remove(task *task.Task) error {
ta, err := a.getTaskAttribute(task)
if err != nil {
return err
}
return a.taskStorage.Remove(ta)
}
func (a *storageAdapter) getTaskAttribute(task *task.Task) (*storage.TaskAttribute, error) {
params, err := paramsToString(task.Params)
if err != nil {
return nil, err
}
isRecurring := 0
if task.IsRecurring {
isRecurring = 1
}
return &storage.TaskAttribute{
Hash: string(task.Hash()),
Name: task.TaskTarget.Name,
LastRun: task.LastRun.Format(time.RFC3339),
NextRun: task.NextRun.Format(time.RFC3339),
Duration: task.Duration.String(),
IsRecurring: strconv.Itoa(isRecurring),
Params: params,
}, nil
}
func paramsToString(params []interface{}) (string, error) {
var paramsList []string
for _, param := range params {
paramStr, err := json.Marshal(param)
if err != nil {
return "", err
}
paramsList = append(paramsList, string(paramStr))
}
data, err := json.Marshal(paramsList)
return string(data), err
}
func paramsFromString(taskTarget *task.TaskTarget, payload string) ([]interface{}, error) {
var params []interface{}
if strings.TrimSpace(payload) == "" {
return params, nil
}
paramTypes := taskTarget.Params()
var paramsStrings []string
err := json.Unmarshal([]byte(payload), &paramsStrings)
if err != nil {
return params, err
}
for i, paramStr := range paramsStrings {
paramType := paramTypes[i]
target := reflect.New(paramType)
err := json.Unmarshal([]byte(paramStr), target.Interface())
if err != nil {
return params, err
}
param := reflect.Indirect(target).Interface()
params = append(params, param)
}
return params, nil
}

View File

@ -0,0 +1,35 @@
package storage
// MemoryStorage is a memory task store
type MemoryStorage struct {
taskAttributes []*TaskAttribute
}
// NewMemoryStorage returns an instance of MemoryStorage.
func NewMemoryStorage() *MemoryStorage {
return &MemoryStorage{}
}
// Add adds a task to the memory store.
func (s *MemoryStorage) Add(taskAttribute *TaskAttribute) error {
s.taskAttributes = append(s.taskAttributes, taskAttribute)
return nil
}
// Fetch will return all tasks stored.
func (s *MemoryStorage) Fetch() ([]*TaskAttribute, error) {
return s.taskAttributes, nil
}
// Remove will remove task from store
func (s *MemoryStorage) Remove(taskAttribute *TaskAttribute) error {
var newTaskAttributes []*TaskAttribute
for _, eTaskAttribute := range s.taskAttributes {
if taskAttribute.Hash == eTaskAttribute.Hash {
continue
}
newTaskAttributes = append(newTaskAttributes, eTaskAttribute)
}
s.taskAttributes = newTaskAttributes
return nil
}

View File

@ -0,0 +1,18 @@
package storage
type TaskAttribute struct {
Hash string
Name string
LastRun string
NextRun string
Duration string
IsRecurring string
Params string
}
// TaskStorage is the interface to implement when adding custom task storage.
type TaskStorage interface {
Add(*TaskAttribute) error
Fetch() ([]*TaskAttribute, error)
Remove(*TaskAttribute) error
}

View File

@ -0,0 +1,81 @@
package task
import (
"fmt"
"reflect"
"runtime"
"sync"
)
type TaskTarget struct {
Name string
targetFunc interface{}
paramTypes map[string]reflect.Type
}
// Params returns the list of parameter types
func (t *TaskTarget) Params() []reflect.Type {
ft := reflect.TypeOf(t.targetFunc)
paramTypes := make([]reflect.Type, ft.NumIn())
for idx := 0; idx < ft.NumIn(); idx++ {
in := ft.In(idx)
paramTypes[idx] = in
}
return paramTypes
}
type TaskTargetRegistry struct {
targetFuncs sync.Map
}
// Add appends the function to the registry after resolving specific information about this function.
func (r *TaskTargetRegistry) Add(targetFunc interface{}) (*TaskTarget, error) {
fv := reflect.ValueOf(targetFunc)
if fv.Kind() != reflect.Func {
return nil, fmt.Errorf("Provided function value is not an actual function")
}
name := runtime.FuncForPC(fv.Pointer()).Name()
funcInstance, err := r.Get(name)
if err == nil {
return funcInstance, nil
}
taskTarget := &TaskTarget{
Name: name,
targetFunc: targetFunc,
paramTypes: r.resolveParamTypes(targetFunc),
}
r.targetFuncs.Store(name, taskTarget)
return taskTarget, nil
}
// Get returns the TaskTarget instance which holds all information about any single registered task function.
func (r *TaskTargetRegistry) Get(name string) (*TaskTarget, error) {
taskTarget, ok := r.targetFuncs.Load(name)
if ok {
return taskTarget.(*TaskTarget), nil
}
return nil, fmt.Errorf("Function %s not found", name)
}
// Exists checks if a function with provided name exists.
func (r *TaskTargetRegistry) Exists(name string) bool {
_, ok := r.targetFuncs.Load(name)
if ok {
return true
}
return false
}
func (r *TaskTargetRegistry) resolveParamTypes(targetFunc interface{}) map[string]reflect.Type {
paramTypes := make(map[string]reflect.Type)
funcType := reflect.TypeOf(targetFunc)
for idx := 0; idx < funcType.NumIn(); idx++ {
in := funcType.In(idx)
paramTypes[in.Name()] = in
}
return paramTypes
}

View File

@ -0,0 +1,79 @@
package task
import (
"crypto/sha1"
"fmt"
"io"
"reflect"
"time"
)
type TaskSchedule struct {
IsRecurring bool
LastRun time.Time
NextRun time.Time
Duration time.Duration
}
type Task struct {
*TaskSchedule
TaskTarget *TaskTarget
Params []interface{}
}
// New returns an instance of task
func New(taskTarget *TaskTarget, params []interface{}) *Task {
return &Task{
TaskSchedule: &TaskSchedule{},
TaskTarget: taskTarget,
Params: params,
}
}
// NewWithSchedule creates an instance of task with the provided schedule information
func NewWithSchedule(taskTarget *TaskTarget, params []interface{}, taskSchedule *TaskSchedule) *Task {
return &Task{
TaskSchedule: taskSchedule,
TaskTarget: taskTarget,
Params: params,
}
}
// IsDue returns a boolean indicating whether the task should execute or not
func (t *Task) IsDue() bool {
timeNow := time.Now()
return timeNow == t.NextRun || timeNow.After(t.NextRun)
}
// Run will execute the task and schedule it's next run.
func (t *Task) Run() {
// Reschedule task first to prevent running the task
// again in case the execution time takes more than the
// task's duration value.
t.scheduleNextRun()
fv := reflect.ValueOf(t.TaskTarget.targetFunc)
params := make([]reflect.Value, len(t.Params))
for i, param := range t.Params {
params[i] = reflect.ValueOf(param)
}
fv.Call(params)
}
// Hash will return the SHA1 representation of the task's data.
func (t *Task) Hash() string {
hash := sha1.New()
_, _ = io.WriteString(hash, t.TaskTarget.Name)
_, _ = io.WriteString(hash, fmt.Sprintf("%+v", t.Params))
_, _ = io.WriteString(hash, fmt.Sprintf("%s", t.TaskSchedule.Duration))
_, _ = io.WriteString(hash, fmt.Sprintf("%t", t.TaskSchedule.IsRecurring))
return fmt.Sprintf("%x", hash.Sum(nil))
}
func (t *Task) scheduleNextRun() {
if !t.IsRecurring {
return
}
t.LastRun = t.NextRun
t.NextRun = t.NextRun.Add(t.Duration)
}