diff --git a/queue.go b/queue.go index 59245f6..429d776 100644 --- a/queue.go +++ b/queue.go @@ -3,6 +3,7 @@ package queue import ( "container/heap" "loafle.com/overflow/agent_api/observer" + "sync" "time" ) @@ -12,11 +13,13 @@ type Item struct { } type LoafleQueue struct { + mtx sync.Mutex items []*Item - queueType observer.KEY + queueType string interval time.Duration - eventChanel chan interface{} + eventChanel chan interface{} + senderChanel chan interface{} } func (lq LoafleQueue) Len() int { @@ -32,18 +35,30 @@ func (lq LoafleQueue) Swap(i, j int) { lq.items[i], lq.items[j] = lq.items[j], lq.items[i] } -func (lq *LoafleQueue) GetItems() []*Item { +func (lq *LoafleQueue) getItems() { - time.Sleep(time.Second * lq.interval) - resultItems := make([]*Item, 0) - for lq.Len() > 0 { - item := heap.Pop(lq).(*Item) - resultItems = append(resultItems, item) + for { + + time.Sleep(time.Second * lq.interval) + resultItems := make([]*Item, 0) + if lq.Len() > 0 { + lq.mtx.Lock() + for i := 0; i < lq.Len(); i++ { + item := heap.Pop(lq).(*Item) + resultItems = append(resultItems, item) + } + lq.mtx.Unlock() + + lq.senderChanel <- resultItems + } } - return resultItems + } func (lq *LoafleQueue) Push(i interface{}) { + lq.mtx.Lock() + defer lq.mtx.Unlock() + n := len(lq.items) nItem := i.(*Item) nItem.Priority = n @@ -72,24 +87,26 @@ func (lq *LoafleQueue) notifyEventHandler(c chan interface{}) { func (lq *LoafleQueue) Close() { if lq.eventChanel != nil { - observer.Remove(lq.queueType.String(), lq.eventChanel) + observer.Remove(lq.queueType, lq.eventChanel) } } -func NewQueue(eventType observer.KEY, interval time.Duration) *LoafleQueue { +func NewQueue(eventType string, interval time.Duration, senderChanel chan interface{}) *LoafleQueue { items := make([]*Item, 0) event := make(chan interface{}, 0) lq := &LoafleQueue{ - items: items, - queueType: eventType, - interval: interval, - eventChanel: event, + items: items, + queueType: eventType, + interval: interval, + eventChanel: event, + senderChanel: senderChanel, } heap.Init(lq) - observer.Add(eventType.String(), lq.eventChanel) + observer.Add(eventType, lq.eventChanel) go lq.notifyEventHandler(event) + go lq.getItems() return lq } diff --git a/queue_test.go b/queue_test.go index 1a5c7c1..00d3ea6 100644 --- a/queue_test.go +++ b/queue_test.go @@ -3,18 +3,41 @@ package queue import ( "fmt" "loafle.com/overflow/agent_api/observer" + "loafle.com/overflow/agent_api/observer/messages" + "reflect" "testing" + "time" ) func TestNewQueue(t *testing.T) { - lq := NewQueue(observer.DATA_QUEUE, 5) + senderChanel := make(chan interface{}, 0) - observer.Notify(observer.DATA_QUEUE.String(), "data") + lq := NewQueue(messages.QUEUE_DATA, 5, senderChanel) - re := lq.GetItems() + fmt.Println(lq) - for it, val := range re { - fmt.Println(val) - fmt.Println(it) + observer.Notify(messages.QUEUE_DATA, "data") + observer.Notify(messages.QUEUE_DATA, "data1111") + + go GetItemTest(senderChanel) + + time.Sleep(time.Second * 10) + + lq.Close() + close(senderChanel) +} + +func GetItemTest(senderChanel chan interface{}) { + + for sc := range senderChanel { + items := reflect.ValueOf(sc) + if items.Kind() != reflect.Slice { + fmt.Println("ddddd") + } + + for i := 0; i < items.Len(); i++ { + d := items.Index(i) + fmt.Println(d) + } } }