diff --git a/queue.go b/queue.go index 0ad95ce..2fb9568 100644 --- a/queue.go +++ b/queue.go @@ -3,6 +3,7 @@ package queue import ( "container/heap" "loafle.com/overflow/agent_api/observer" + "log" "sync" "time" ) @@ -37,6 +38,7 @@ func (lq LoafleQueue) Swap(i, j int) { } func (lq *LoafleQueue) getItems() { + resultItems := make([]*Item, 0) for { @@ -44,7 +46,7 @@ func (lq *LoafleQueue) getItems() { if lq.Len() > 0 { lq.size = lq.Len() - resultItems := make([]*Item, 0) + lq.mtx.Lock() for i := 0; i < lq.size; i++ { item := heap.Pop(lq).(*Item) @@ -54,6 +56,8 @@ func (lq *LoafleQueue) getItems() { lq.senderChanel <- resultItems lq.size = lq.Len() + //log.Println("result length: ", len(resultItems)) + resultItems = nil } } diff --git a/queue_test.go b/queue_test.go index 8af08e3..02fa06c 100644 --- a/queue_test.go +++ b/queue_test.go @@ -12,7 +12,7 @@ import ( func TestNewQueue(t *testing.T) { senderChanel := make(chan interface{}, 0) - lq := NewQueue(messages.QUEUE_DATA, 5, senderChanel) + lq := NewQueue(messages.QUEUE_DATA, 3, senderChanel) log.Println(lq) @@ -26,6 +26,11 @@ func TestNewQueue(t *testing.T) { observer.Notify(messages.QUEUE_DATA, "data7777") observer.Notify(messages.QUEUE_DATA, "data8888") observer.Notify(messages.QUEUE_DATA, "data9999") + + go GetItemTest(senderChanel) + + time.Sleep(time.Second * 3) + observer.Notify(messages.QUEUE_DATA, "data1111") observer.Notify(messages.QUEUE_DATA, "data2222") observer.Notify(messages.QUEUE_DATA, "data3333") @@ -34,8 +39,6 @@ func TestNewQueue(t *testing.T) { observer.Notify(messages.QUEUE_DATA, "data6666") observer.Notify(messages.QUEUE_DATA, "data7777") - go GetItemTest(senderChanel) - time.Sleep(time.Second * 10) lq.Close()