package queue import ( "container/heap" log "github.com/cihub/seelog" "sync" "time" ) type Item struct { Value interface{} Priority int } type LoafleQueue struct { mtx sync.Mutex items []*Item interval time.Duration size int senderChanel chan interface{} } func (lq LoafleQueue) Len() int { return len(lq.items) } func (lq LoafleQueue) Less(i, j int) bool { return lq.items[i].Priority < lq.items[j].Priority } func (lq LoafleQueue) Swap(i, j int) { lq.items[i], lq.items[j] = lq.items[j], lq.items[i] } func (lq *LoafleQueue) getItems() { resultItems := make([]*Item, 0) for { time.Sleep(time.Second * lq.interval) if lq.Len() > 0 { lq.size = lq.Len() lq.mtx.Lock() for i := 0; i < lq.size; i++ { item := heap.Pop(lq).(*Item) resultItems = append(resultItems, item) } lq.mtx.Unlock() lq.senderChanel <- resultItems lq.size = lq.Len() resultItems = nil } } } func (lq *LoafleQueue) Push(i interface{}) { lq.mtx.Lock() defer lq.mtx.Unlock() n := len(lq.items) nItem := i.(*Item) nItem.Priority = n lq.items = append(lq.items, nItem) } func (lq *LoafleQueue) Pop() interface{} { old := lq.items n := len(old) nItem := old[n-1] lq.items = old[0 : n-1] return nItem } func (lq LoafleQueue) newItem(value interface{}) *Item { return &Item{ Value: value, } } func (lq *LoafleQueue) PushItem(v interface{}) { it := lq.newItem(v) heap.Push(lq, it) } func (lq *LoafleQueue) Close() { //if lq.eventChanel != nil { // observer.Remove(lq.queueType, lq.eventChanel) //} lq.items = nil } func NewQueue(interval time.Duration, senderChanel chan interface{}) *LoafleQueue { items := make([]*Item, 0) log.Info("New queue has started.") lq := &LoafleQueue{ items: items, interval: interval, senderChanel: senderChanel, } heap.Init(lq) //observer.Add(eventType, lq.eventChanel) //go lq.notifyEventHandler(event) go lq.getItems() return lq }