getItems edited
This commit is contained in:
parent
6b8b6f6189
commit
53ce7418af
6
queue.go
6
queue.go
|
@ -3,6 +3,7 @@ package queue
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
"loafle.com/overflow/agent_api/observer"
|
"loafle.com/overflow/agent_api/observer"
|
||||||
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
@ -37,6 +38,7 @@ func (lq LoafleQueue) Swap(i, j int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lq *LoafleQueue) getItems() {
|
func (lq *LoafleQueue) getItems() {
|
||||||
|
resultItems := make([]*Item, 0)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
||||||
|
@ -44,7 +46,7 @@ func (lq *LoafleQueue) getItems() {
|
||||||
|
|
||||||
if lq.Len() > 0 {
|
if lq.Len() > 0 {
|
||||||
lq.size = lq.Len()
|
lq.size = lq.Len()
|
||||||
resultItems := make([]*Item, 0)
|
|
||||||
lq.mtx.Lock()
|
lq.mtx.Lock()
|
||||||
for i := 0; i < lq.size; i++ {
|
for i := 0; i < lq.size; i++ {
|
||||||
item := heap.Pop(lq).(*Item)
|
item := heap.Pop(lq).(*Item)
|
||||||
|
@ -54,6 +56,8 @@ func (lq *LoafleQueue) getItems() {
|
||||||
|
|
||||||
lq.senderChanel <- resultItems
|
lq.senderChanel <- resultItems
|
||||||
lq.size = lq.Len()
|
lq.size = lq.Len()
|
||||||
|
//log.Println("result length: ", len(resultItems))
|
||||||
|
resultItems = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@ import (
|
||||||
func TestNewQueue(t *testing.T) {
|
func TestNewQueue(t *testing.T) {
|
||||||
senderChanel := make(chan interface{}, 0)
|
senderChanel := make(chan interface{}, 0)
|
||||||
|
|
||||||
lq := NewQueue(messages.QUEUE_DATA, 5, senderChanel)
|
lq := NewQueue(messages.QUEUE_DATA, 3, senderChanel)
|
||||||
|
|
||||||
log.Println(lq)
|
log.Println(lq)
|
||||||
|
|
||||||
|
@ -26,6 +26,11 @@ func TestNewQueue(t *testing.T) {
|
||||||
observer.Notify(messages.QUEUE_DATA, "data7777")
|
observer.Notify(messages.QUEUE_DATA, "data7777")
|
||||||
observer.Notify(messages.QUEUE_DATA, "data8888")
|
observer.Notify(messages.QUEUE_DATA, "data8888")
|
||||||
observer.Notify(messages.QUEUE_DATA, "data9999")
|
observer.Notify(messages.QUEUE_DATA, "data9999")
|
||||||
|
|
||||||
|
go GetItemTest(senderChanel)
|
||||||
|
|
||||||
|
time.Sleep(time.Second * 3)
|
||||||
|
|
||||||
observer.Notify(messages.QUEUE_DATA, "data1111")
|
observer.Notify(messages.QUEUE_DATA, "data1111")
|
||||||
observer.Notify(messages.QUEUE_DATA, "data2222")
|
observer.Notify(messages.QUEUE_DATA, "data2222")
|
||||||
observer.Notify(messages.QUEUE_DATA, "data3333")
|
observer.Notify(messages.QUEUE_DATA, "data3333")
|
||||||
|
@ -34,8 +39,6 @@ func TestNewQueue(t *testing.T) {
|
||||||
observer.Notify(messages.QUEUE_DATA, "data6666")
|
observer.Notify(messages.QUEUE_DATA, "data6666")
|
||||||
observer.Notify(messages.QUEUE_DATA, "data7777")
|
observer.Notify(messages.QUEUE_DATA, "data7777")
|
||||||
|
|
||||||
go GetItemTest(senderChanel)
|
|
||||||
|
|
||||||
time.Sleep(time.Second * 10)
|
time.Sleep(time.Second * 10)
|
||||||
|
|
||||||
lq.Close()
|
lq.Close()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user