This commit is contained in:
geek 2017-04-27 17:38:35 +09:00
parent 430e993a68
commit 44a4819096
2 changed files with 25 additions and 4 deletions

View File

@ -17,6 +17,7 @@ type LoafleQueue struct {
items []*Item items []*Item
queueType string queueType string
interval time.Duration interval time.Duration
size int
eventChanel chan interface{} eventChanel chan interface{}
senderChanel chan interface{} senderChanel chan interface{}
@ -40,16 +41,19 @@ func (lq *LoafleQueue) getItems() {
for { for {
time.Sleep(time.Second * lq.interval) time.Sleep(time.Second * lq.interval)
resultItems := make([]*Item, 0)
if lq.Len() > 0 { if lq.Len() > 0 {
lq.size = lq.Len()
resultItems := make([]*Item, 0)
lq.mtx.Lock() lq.mtx.Lock()
for i := 0; i < lq.Len(); i++ { for i := 0; i < lq.size; i++ {
item := heap.Pop(lq).(*Item) item := heap.Pop(lq).(*Item)
resultItems = append(resultItems, item) resultItems = append(resultItems, item)
} }
lq.mtx.Unlock() lq.mtx.Unlock()
lq.senderChanel <- resultItems lq.senderChanel <- resultItems
lq.size = lq.Len()
} }
} }

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"loafle.com/overflow/agent_api/observer" "loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/observer/messages" "loafle.com/overflow/agent_api/observer/messages"
"log"
"reflect" "reflect"
"testing" "testing"
"time" "time"
@ -18,6 +19,21 @@ func TestNewQueue(t *testing.T) {
observer.Notify(messages.QUEUE_DATA, "data") observer.Notify(messages.QUEUE_DATA, "data")
observer.Notify(messages.QUEUE_DATA, "data1111") observer.Notify(messages.QUEUE_DATA, "data1111")
observer.Notify(messages.QUEUE_DATA, "data2222")
observer.Notify(messages.QUEUE_DATA, "data3333")
observer.Notify(messages.QUEUE_DATA, "data4444")
observer.Notify(messages.QUEUE_DATA, "data5555")
observer.Notify(messages.QUEUE_DATA, "data6666")
observer.Notify(messages.QUEUE_DATA, "data7777")
observer.Notify(messages.QUEUE_DATA, "data8888")
observer.Notify(messages.QUEUE_DATA, "data9999")
observer.Notify(messages.QUEUE_DATA, "data1111")
observer.Notify(messages.QUEUE_DATA, "data2222")
observer.Notify(messages.QUEUE_DATA, "data3333")
observer.Notify(messages.QUEUE_DATA, "data4444")
observer.Notify(messages.QUEUE_DATA, "data5555")
observer.Notify(messages.QUEUE_DATA, "data6666")
observer.Notify(messages.QUEUE_DATA, "data7777")
go GetItemTest(senderChanel) go GetItemTest(senderChanel)
@ -34,10 +50,11 @@ func GetItemTest(senderChanel chan interface{}) {
if items.Kind() != reflect.Slice { if items.Kind() != reflect.Slice {
fmt.Println("ddddd") fmt.Println("ddddd")
} }
//log.Println(items.Len())
for i := 0; i < items.Len(); i++ { for i := 0; i < items.Len(); i++ {
d := items.Index(i) d := items.Index(i)
fmt.Println(d) log.Println(d)
} }
//fmt.Println(sc)
} }
} }