queue pushItem method create

This commit is contained in:
geek 2017-05-15 17:52:28 +09:00
parent 22de0f11bf
commit 780efc645c

View File

@ -2,7 +2,6 @@ package queue
import ( import (
"container/heap" "container/heap"
"loafle.com/overflow/agent_api/observer"
"log" "log"
"sync" "sync"
"time" "time"
@ -16,11 +15,9 @@ type Item struct {
type LoafleQueue struct { type LoafleQueue struct {
mtx sync.Mutex mtx sync.Mutex
items []*Item items []*Item
queueType string
interval time.Duration interval time.Duration
size int size int
eventChanel chan interface{}
senderChanel chan interface{} senderChanel chan interface{}
} }
@ -86,36 +83,34 @@ func (lq LoafleQueue) newItem(value interface{}) *Item {
} }
} }
func (lq *LoafleQueue) notifyEventHandler(c chan interface{}) { func (lq *LoafleQueue) PushItem(v interface{}) {
for data := range c {
it := lq.newItem(data) it := lq.newItem(v)
heap.Push(lq, it) heap.Push(lq, it)
}
} }
func (lq *LoafleQueue) Close() { func (lq *LoafleQueue) Close() {
if lq.eventChanel != nil { //if lq.eventChanel != nil {
observer.Remove(lq.queueType, lq.eventChanel) // observer.Remove(lq.queueType, lq.eventChanel)
} //}
lq.items = nil lq.items = nil
} }
func NewQueue(eventType string, interval time.Duration, senderChanel chan interface{}) *LoafleQueue {
func NewQueue(interval time.Duration, senderChanel chan interface{}) *LoafleQueue {
items := make([]*Item, 0) items := make([]*Item, 0)
event := make(chan interface{}, 0)
log.Println("NewQueu Start") log.Println("NewQueu Start")
lq := &LoafleQueue{ lq := &LoafleQueue{
items: items, items: items,
queueType: eventType,
interval: interval, interval: interval,
eventChanel: event,
senderChanel: senderChanel, senderChanel: senderChanel,
} }
heap.Init(lq) heap.Init(lq)
observer.Add(eventType, lq.eventChanel) //observer.Add(eventType, lq.eventChanel)
go lq.notifyEventHandler(event) //go lq.notifyEventHandler(event)
go lq.getItems() go lq.getItems()
return lq return lq