package queue import ( "container/heap" "time" "loafle.com/overflow/agent_api/observer" "github.com/prometheus/common/log" ) type EventType int const ( DATA_TYPE EventType = 0 EVENT_TYPE EventType = 1 ) type item struct { value interface{} priority int } type LoafleQueue struct { items []*item queueType string interval time.Duration eventChanel chan interface{} } func (lq LoafleQueue) Len() int { return len(lq.items) } func (lq LoafleQueue) Less(i, j int) bool { return lq.items[i].priority < lq.items[j].priority } func (lq LoafleQueue) Swap(i, j int) { lq.items[i], lq.items[j] = lq.items[j], lq.items[i] } func (lq *LoafleQueue) GetItems() []*item { time.Sleep(time.Second * lq.interval) resultItems := make([]*item, 0) for lq.Len() > 0 { item := heap.Pop(lq).(*item) resultItems = append(resultItems, item) } return resultItems } func (lq *LoafleQueue) Push(i interface{}) { n := len(lq.items) nItem := i.(*item) nItem.priority = n lq.items = append(lq.items, nItem) } func (lq *LoafleQueue) Pop() interface{} { old := lq.items n := len(old) nItem := old[n-1] lq.items = old[0 : n-1] return nItem } func (lq LoafleQueue) newItem(value interface{}) *item { return &item{ value:value, } } func (lq *LoafleQueue) notifyEventHandler(c chan interface{}) { for data := range c { it := lq.newItem(data) heap.Push(lq, it) } } func (lq *LoafleQueue) Close() { if lq.eventChanel != nil { observer.Remove(lq.queueType, lq.eventChanel) } } func NewLoafleQueue(eventType EventType, interval time.Duration) *LoafleQueue { items := make([]*item, 0) event := make(chan interface{},0) var tempType string if eventType == DATA_TYPE { tempType = "DATA_TYPE" }else if eventType == EVENT_TYPE{ tempType = "EVENT_TYPE" }else { log.Fatal("Event Type Error") } lq := &LoafleQueue{ items:items, queueType:tempType, interval:interval, eventChanel:event, } observer.Add(lq.queueType, lq.eventChanel) go lq.notifyEventHandler(event) return lq }