queue channel added and mutex added

This commit is contained in:
geek 2017-04-27 15:23:04 +09:00
parent 47fa25a4f4
commit 55f8a0d30f
2 changed files with 62 additions and 22 deletions

View File

@ -3,6 +3,7 @@ package queue
import ( import (
"container/heap" "container/heap"
"loafle.com/overflow/agent_api/observer" "loafle.com/overflow/agent_api/observer"
"sync"
"time" "time"
) )
@ -12,11 +13,13 @@ type Item struct {
} }
type LoafleQueue struct { type LoafleQueue struct {
mtx sync.Mutex
items []*Item items []*Item
queueType observer.KEY queueType string
interval time.Duration interval time.Duration
eventChanel chan interface{} eventChanel chan interface{}
senderChanel chan interface{}
} }
func (lq LoafleQueue) Len() int { func (lq LoafleQueue) Len() int {
@ -32,18 +35,30 @@ func (lq LoafleQueue) Swap(i, j int) {
lq.items[i], lq.items[j] = lq.items[j], lq.items[i] lq.items[i], lq.items[j] = lq.items[j], lq.items[i]
} }
func (lq *LoafleQueue) GetItems() []*Item { func (lq *LoafleQueue) getItems() {
time.Sleep(time.Second * lq.interval) for {
resultItems := make([]*Item, 0)
for lq.Len() > 0 { time.Sleep(time.Second * lq.interval)
item := heap.Pop(lq).(*Item) resultItems := make([]*Item, 0)
resultItems = append(resultItems, item) if lq.Len() > 0 {
lq.mtx.Lock()
for i := 0; i < lq.Len(); i++ {
item := heap.Pop(lq).(*Item)
resultItems = append(resultItems, item)
}
lq.mtx.Unlock()
lq.senderChanel <- resultItems
}
} }
return resultItems
} }
func (lq *LoafleQueue) Push(i interface{}) { func (lq *LoafleQueue) Push(i interface{}) {
lq.mtx.Lock()
defer lq.mtx.Unlock()
n := len(lq.items) n := len(lq.items)
nItem := i.(*Item) nItem := i.(*Item)
nItem.Priority = n nItem.Priority = n
@ -72,24 +87,26 @@ func (lq *LoafleQueue) notifyEventHandler(c chan interface{}) {
func (lq *LoafleQueue) Close() { func (lq *LoafleQueue) Close() {
if lq.eventChanel != nil { if lq.eventChanel != nil {
observer.Remove(lq.queueType.String(), lq.eventChanel) observer.Remove(lq.queueType, lq.eventChanel)
} }
} }
func NewQueue(eventType observer.KEY, interval time.Duration) *LoafleQueue { func NewQueue(eventType string, interval time.Duration, senderChanel chan interface{}) *LoafleQueue {
items := make([]*Item, 0) items := make([]*Item, 0)
event := make(chan interface{}, 0) event := make(chan interface{}, 0)
lq := &LoafleQueue{ lq := &LoafleQueue{
items: items, items: items,
queueType: eventType, queueType: eventType,
interval: interval, interval: interval,
eventChanel: event, eventChanel: event,
senderChanel: senderChanel,
} }
heap.Init(lq) heap.Init(lq)
observer.Add(eventType.String(), lq.eventChanel) observer.Add(eventType, lq.eventChanel)
go lq.notifyEventHandler(event) go lq.notifyEventHandler(event)
go lq.getItems()
return lq return lq
} }

View File

@ -3,18 +3,41 @@ package queue
import ( import (
"fmt" "fmt"
"loafle.com/overflow/agent_api/observer" "loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/observer/messages"
"reflect"
"testing" "testing"
"time"
) )
func TestNewQueue(t *testing.T) { func TestNewQueue(t *testing.T) {
lq := NewQueue(observer.DATA_QUEUE, 5) senderChanel := make(chan interface{}, 0)
observer.Notify(observer.DATA_QUEUE.String(), "data") lq := NewQueue(messages.QUEUE_DATA, 5, senderChanel)
re := lq.GetItems() fmt.Println(lq)
for it, val := range re { observer.Notify(messages.QUEUE_DATA, "data")
fmt.Println(val) observer.Notify(messages.QUEUE_DATA, "data1111")
fmt.Println(it)
go GetItemTest(senderChanel)
time.Sleep(time.Second * 10)
lq.Close()
close(senderChanel)
}
func GetItemTest(senderChanel chan interface{}) {
for sc := range senderChanel {
items := reflect.ValueOf(sc)
if items.Kind() != reflect.Slice {
fmt.Println("ddddd")
}
for i := 0; i < items.Len(); i++ {
d := items.Index(i)
fmt.Println(d)
}
} }
} }