diff --git a/time/scheduler/scheduler.go b/time/scheduler/scheduler.go new file mode 100644 index 0000000..4b5275d --- /dev/null +++ b/time/scheduler/scheduler.go @@ -0,0 +1,217 @@ +package scheduler + +import ( + "fmt" + "log" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "git.loafle.net/overflow/probe/service/scheduler/storage" + "git.loafle.net/overflow/probe/service/scheduler/task" +) + +type Scheduler struct { + taskTargetRegistry *task.TaskTargetRegistry + + tasks sync.Map + adapter *storageAdapter + + stopChan chan bool +} + +// New will return a new instance of the Scheduler struct. +func New(taskStorage storage.TaskStorage) *Scheduler { + taskTargetRegistry := &task.TaskTargetRegistry{} + + return &Scheduler{ + taskTargetRegistry: taskTargetRegistry, + stopChan: make(chan bool), + adapter: &storageAdapter{ + taskStorage: taskStorage, + taskTargetRegistry: taskTargetRegistry, + }, + } +} + +// RunAt will schedule function to be executed once at the given time. +func (s *Scheduler) RunAt(time time.Time, targetFunc interface{}, params ...interface{}) (string, error) { + taskTarget, err := s.taskTargetRegistry.Add(targetFunc) + if err != nil { + return "", err + } + + task := task.New(taskTarget, params) + task.NextRun = time + s.registerTask(task) + + return task.Hash(), nil +} + +// RunAfter executes function once after a specific duration has elapsed. +func (s *Scheduler) RunAfter(duration time.Duration, targetFunc interface{}, params ...interface{}) (string, error) { + return s.RunAt(time.Now().Add(duration), targetFunc, params...) +} + +// RunEvery will schedule function to be executed every time the duration has elapsed. +func (s *Scheduler) RunEvery(duration time.Duration, targetFunc interface{}, params ...interface{}) (string, error) { + taskTarget, err := s.taskTargetRegistry.Add(targetFunc) + if err != nil { + return "", err + } + + task := task.New(taskTarget, params) + task.IsRecurring = true + task.Duration = duration + task.NextRun = time.Now().Add(duration) + s.registerTask(task) + + return task.Hash(), nil +} + +// Start will run the scheduler's timer and will trigger the execution +// of tasks depending on their schedule. +func (s *Scheduler) Start() error { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + // Populate tasks from storage + if err := s.populateTasks(); err != nil { + return err + } + if err := s.persistRegisteredTasks(); err != nil { + return err + } + s.runPending() + + go func() { + ticker := time.NewTicker(1 * time.Second) + for { + select { + case <-ticker.C: + s.runPending() + case <-sigChan: + s.stopChan <- true + case <-s.stopChan: + close(s.stopChan) + } + } + }() + + return nil +} + +// Stop will put the scheduler to halt +func (s *Scheduler) Stop() { + s.stopChan <- true +} + +// Wait is a convenience function for blocking until the scheduler is stopped. +func (s *Scheduler) Wait() { + <-s.stopChan +} + +// Cancel is used to cancel the planned execution of a specific task using it's ID. +// The ID is returned when the task was scheduled using RunAt, RunAfter or RunEvery +func (s *Scheduler) Cancel(taskID string) error { + t, ok := s.tasks.Load(taskID) + if !ok { + return fmt.Errorf("Task not found") + } + + s.adapter.Remove(t.(*task.Task)) + s.tasks.Delete(taskID) + return nil +} + +// Clear will cancel the execution and clear all registered tasks. +func (s *Scheduler) Clear() { + s.tasks.Range(func(_taskID, _task interface{}) bool { + s.adapter.Remove(_task.(*task.Task)) + s.tasks.Delete(_taskID.(string)) + return true + }) + + s.taskTargetRegistry = &task.TaskTargetRegistry{} +} + +func (s *Scheduler) populateTasks() error { + tasks, err := s.adapter.Fetch() + if err != nil { + return err + } + + for _, _task := range tasks { + // If we can't find the function, it's been changed/removed by user + exists := s.taskTargetRegistry.Exists(_task.TaskTarget.Name) + if !exists { + log.Printf("%s was not found, it will be removed\n", _task.TaskTarget.Name) + _ = s.adapter.Remove(_task) + continue + } + + // If the task instance is still registered with the same computed hash then move on. + // Otherwise, one of the attributes changed and therefore, the task instance should + // be added to the list of tasks to be executed with the stored params + _eTask, ok := s.tasks.Load(_task.Hash()) + if !ok { + log.Printf("Detected a change in attributes of one of the instances of task %s, \n", + _task.TaskTarget.Name) + _task.TaskTarget, _ = s.taskTargetRegistry.Get(_task.TaskTarget.Name) + _eTask = _task + s.tasks.Store(_task.Hash(), _eTask) + } + eTask := _eTask.(*task.Task) + + // Skip task which is not a recurring one and the NextRun has already passed + if !_task.IsRecurring && _task.NextRun.Before(time.Now()) { + // We might have a task instance which was executed already. + // In this case, delete it. + _ = s.adapter.Remove(_task) + s.tasks.Delete(_task.Hash()) + continue + } + + // Duration may have changed for recurring tasks + if _task.IsRecurring && eTask.Duration != _task.Duration { + // Reschedule NextRun based on _task.LastRun + registeredTask.Duration + eTask.NextRun = _task.LastRun.Add(eTask.Duration) + } + } + return nil +} + +func (s *Scheduler) persistRegisteredTasks() error { + var err error + s.tasks.Range(func(_taskID, _task interface{}) bool { + err = s.adapter.Add(_task.(*task.Task)) + if nil != err { + return false + } + return true + }) + + return err +} + +func (s *Scheduler) runPending() { + s.tasks.Range(func(_taskID, _task interface{}) bool { + t := _task.(*task.Task) + if t.IsDue() { + go t.Run() + if !t.IsRecurring { + s.adapter.Remove(t) + s.tasks.Delete(t.Hash()) + } + } + return true + }) +} + +func (s *Scheduler) registerTask(task *task.Task) { + s.taskTargetRegistry.Add(task.TaskTarget) + + s.tasks.Store(task.Hash(), task) +} diff --git a/time/scheduler/scheduler_test.go b/time/scheduler/scheduler_test.go new file mode 100644 index 0000000..5e85244 --- /dev/null +++ b/time/scheduler/scheduler_test.go @@ -0,0 +1,41 @@ +package scheduler_test + +import ( + "log" + "testing" + "time" + + "git.loafle.net/overflow/probe/service/scheduler" + "git.loafle.net/overflow/probe/service/scheduler/storage" +) + +func TestDiscoverPort(t *testing.T) { + _storage := storage.NewMemoryStorage() + s := scheduler.New(_storage) + + // Start a task with arguments + tID1, err := s.RunEvery(1*time.Second, TaskWithArgs, "Hello from recurring task 1") + if err != nil { + log.Fatal(err) + } + + // Start the same task as above with a different argument + _, err = s.RunEvery(4*time.Second, TaskWithArgs, "Hello from recurring task 2") + if err != nil { + log.Fatal(err) + } + s.Start() + + time.Sleep(4 * time.Second) + s.Cancel(tID1) + + s.Wait() +} + +func TaskWithoutArgs() { + log.Println("TaskWithoutArgs is executed") +} + +func TaskWithArgs(message string) { + log.Println("TaskWithArgs is executed. message:", message) +} diff --git a/time/scheduler/storage.go b/time/scheduler/storage.go new file mode 100644 index 0000000..4eeb253 --- /dev/null +++ b/time/scheduler/storage.go @@ -0,0 +1,141 @@ +package scheduler + +import ( + "encoding/json" + "reflect" + "strconv" + "strings" + "time" + + "git.loafle.net/overflow/probe/service/scheduler/storage" + "git.loafle.net/overflow/probe/service/scheduler/task" +) + +type storageAdapter struct { + taskStorage storage.TaskStorage + taskTargetRegistry *task.TaskTargetRegistry +} + +func (a *storageAdapter) Add(task *task.Task) error { + ta, err := a.getTaskAttribute(task) + if err != nil { + return err + } + return a.taskStorage.Add(ta) +} + +func (a *storageAdapter) Fetch() ([]*task.Task, error) { + tas, err := a.taskStorage.Fetch() + if err != nil { + return nil, err + } + var tasks []*task.Task + for _, ta := range tas { + lastRun, err := time.Parse(time.RFC3339, ta.LastRun) + if err != nil { + return nil, err + } + + nextRun, err := time.Parse(time.RFC3339, ta.NextRun) + if err != nil { + return nil, err + } + + duration, err := time.ParseDuration(ta.Duration) + if err != nil { + return nil, err + } + + isRecurring, err := strconv.Atoi(ta.IsRecurring) + if err != nil { + return nil, err + } + + taskTarget, err := a.taskTargetRegistry.Get(ta.Name) + if err != nil { + return nil, err + } + + params, err := paramsFromString(taskTarget, ta.Params) + if err != nil { + return nil, err + } + + t := task.NewWithSchedule(taskTarget, params, &task.TaskSchedule{ + IsRecurring: isRecurring == 1, + Duration: time.Duration(duration), + LastRun: lastRun, + NextRun: nextRun, + }) + tasks = append(tasks, t) + } + return tasks, nil +} + +func (a *storageAdapter) Remove(task *task.Task) error { + ta, err := a.getTaskAttribute(task) + if err != nil { + return err + } + return a.taskStorage.Remove(ta) +} + +func (a *storageAdapter) getTaskAttribute(task *task.Task) (*storage.TaskAttribute, error) { + params, err := paramsToString(task.Params) + if err != nil { + return nil, err + } + + isRecurring := 0 + if task.IsRecurring { + isRecurring = 1 + } + + return &storage.TaskAttribute{ + Hash: string(task.Hash()), + Name: task.TaskTarget.Name, + LastRun: task.LastRun.Format(time.RFC3339), + NextRun: task.NextRun.Format(time.RFC3339), + Duration: task.Duration.String(), + IsRecurring: strconv.Itoa(isRecurring), + Params: params, + }, nil +} + +func paramsToString(params []interface{}) (string, error) { + var paramsList []string + for _, param := range params { + paramStr, err := json.Marshal(param) + if err != nil { + return "", err + } + paramsList = append(paramsList, string(paramStr)) + } + data, err := json.Marshal(paramsList) + return string(data), err +} + +func paramsFromString(taskTarget *task.TaskTarget, payload string) ([]interface{}, error) { + var params []interface{} + if strings.TrimSpace(payload) == "" { + return params, nil + } + paramTypes := taskTarget.Params() + var paramsStrings []string + err := json.Unmarshal([]byte(payload), ¶msStrings) + if err != nil { + return params, err + } + for i, paramStr := range paramsStrings { + paramType := paramTypes[i] + target := reflect.New(paramType) + err := json.Unmarshal([]byte(paramStr), target.Interface()) + if err != nil { + return params, err + } + param := reflect.Indirect(target).Interface() + params = append(params, param) + } + + return params, nil +} diff --git a/time/scheduler/storage/memory.go b/time/scheduler/storage/memory.go new file mode 100644 index 0000000..3fb12b6 --- /dev/null +++ b/time/scheduler/storage/memory.go @@ -0,0 +1,35 @@ +package storage + +// MemoryStorage is a memory task store +type MemoryStorage struct { + taskAttributes []*TaskAttribute +} + +// NewMemoryStorage returns an instance of MemoryStorage. +func NewMemoryStorage() *MemoryStorage { + return &MemoryStorage{} +} + +// Add adds a task to the memory store. +func (s *MemoryStorage) Add(taskAttribute *TaskAttribute) error { + s.taskAttributes = append(s.taskAttributes, taskAttribute) + return nil +} + +// Fetch will return all tasks stored. +func (s *MemoryStorage) Fetch() ([]*TaskAttribute, error) { + return s.taskAttributes, nil +} + +// Remove will remove task from store +func (s *MemoryStorage) Remove(taskAttribute *TaskAttribute) error { + var newTaskAttributes []*TaskAttribute + for _, eTaskAttribute := range s.taskAttributes { + if taskAttribute.Hash == eTaskAttribute.Hash { + continue + } + newTaskAttributes = append(newTaskAttributes, eTaskAttribute) + } + s.taskAttributes = newTaskAttributes + return nil +} diff --git a/time/scheduler/storage/storage.go b/time/scheduler/storage/storage.go new file mode 100644 index 0000000..d72ac40 --- /dev/null +++ b/time/scheduler/storage/storage.go @@ -0,0 +1,18 @@ +package storage + +type TaskAttribute struct { + Hash string + Name string + LastRun string + NextRun string + Duration string + IsRecurring string + Params string +} + +// TaskStorage is the interface to implement when adding custom task storage. +type TaskStorage interface { + Add(*TaskAttribute) error + Fetch() ([]*TaskAttribute, error) + Remove(*TaskAttribute) error +} diff --git a/time/scheduler/task/registry.go b/time/scheduler/task/registry.go new file mode 100644 index 0000000..ded5cd2 --- /dev/null +++ b/time/scheduler/task/registry.go @@ -0,0 +1,81 @@ +package task + +import ( + "fmt" + "reflect" + "runtime" + "sync" +) + +type TaskTarget struct { + Name string + targetFunc interface{} + paramTypes map[string]reflect.Type +} + +// Params returns the list of parameter types +func (t *TaskTarget) Params() []reflect.Type { + ft := reflect.TypeOf(t.targetFunc) + paramTypes := make([]reflect.Type, ft.NumIn()) + for idx := 0; idx < ft.NumIn(); idx++ { + in := ft.In(idx) + paramTypes[idx] = in + } + return paramTypes +} + +type TaskTargetRegistry struct { + targetFuncs sync.Map +} + +// Add appends the function to the registry after resolving specific information about this function. +func (r *TaskTargetRegistry) Add(targetFunc interface{}) (*TaskTarget, error) { + fv := reflect.ValueOf(targetFunc) + if fv.Kind() != reflect.Func { + return nil, fmt.Errorf("Provided function value is not an actual function") + } + + name := runtime.FuncForPC(fv.Pointer()).Name() + funcInstance, err := r.Get(name) + if err == nil { + return funcInstance, nil + } + + taskTarget := &TaskTarget{ + Name: name, + targetFunc: targetFunc, + paramTypes: r.resolveParamTypes(targetFunc), + } + + r.targetFuncs.Store(name, taskTarget) + + return taskTarget, nil +} + +// Get returns the TaskTarget instance which holds all information about any single registered task function. +func (r *TaskTargetRegistry) Get(name string) (*TaskTarget, error) { + taskTarget, ok := r.targetFuncs.Load(name) + if ok { + return taskTarget.(*TaskTarget), nil + } + return nil, fmt.Errorf("Function %s not found", name) +} + +// Exists checks if a function with provided name exists. +func (r *TaskTargetRegistry) Exists(name string) bool { + _, ok := r.targetFuncs.Load(name) + if ok { + return true + } + return false +} + +func (r *TaskTargetRegistry) resolveParamTypes(targetFunc interface{}) map[string]reflect.Type { + paramTypes := make(map[string]reflect.Type) + funcType := reflect.TypeOf(targetFunc) + for idx := 0; idx < funcType.NumIn(); idx++ { + in := funcType.In(idx) + paramTypes[in.Name()] = in + } + return paramTypes +} diff --git a/time/scheduler/task/task.go b/time/scheduler/task/task.go new file mode 100644 index 0000000..d7a25b6 --- /dev/null +++ b/time/scheduler/task/task.go @@ -0,0 +1,79 @@ +package task + +import ( + "crypto/sha1" + "fmt" + "io" + "reflect" + "time" +) + +type TaskSchedule struct { + IsRecurring bool + LastRun time.Time + NextRun time.Time + Duration time.Duration +} + +type Task struct { + *TaskSchedule + TaskTarget *TaskTarget + Params []interface{} +} + +// New returns an instance of task +func New(taskTarget *TaskTarget, params []interface{}) *Task { + return &Task{ + TaskSchedule: &TaskSchedule{}, + TaskTarget: taskTarget, + Params: params, + } +} + +// NewWithSchedule creates an instance of task with the provided schedule information +func NewWithSchedule(taskTarget *TaskTarget, params []interface{}, taskSchedule *TaskSchedule) *Task { + return &Task{ + TaskSchedule: taskSchedule, + TaskTarget: taskTarget, + Params: params, + } +} + +// IsDue returns a boolean indicating whether the task should execute or not +func (t *Task) IsDue() bool { + timeNow := time.Now() + return timeNow == t.NextRun || timeNow.After(t.NextRun) +} + +// Run will execute the task and schedule it's next run. +func (t *Task) Run() { + // Reschedule task first to prevent running the task + // again in case the execution time takes more than the + // task's duration value. + t.scheduleNextRun() + + fv := reflect.ValueOf(t.TaskTarget.targetFunc) + params := make([]reflect.Value, len(t.Params)) + for i, param := range t.Params { + params[i] = reflect.ValueOf(param) + } + fv.Call(params) +} + +// Hash will return the SHA1 representation of the task's data. +func (t *Task) Hash() string { + hash := sha1.New() + _, _ = io.WriteString(hash, t.TaskTarget.Name) + _, _ = io.WriteString(hash, fmt.Sprintf("%+v", t.Params)) + _, _ = io.WriteString(hash, fmt.Sprintf("%s", t.TaskSchedule.Duration)) + _, _ = io.WriteString(hash, fmt.Sprintf("%t", t.TaskSchedule.IsRecurring)) + return fmt.Sprintf("%x", hash.Sum(nil)) +} + +func (t *Task) scheduleNextRun() { + if !t.IsRecurring { + return + } + t.LastRun = t.NextRun + t.NextRun = t.NextRun.Add(t.Duration) +}