first probe
This commit is contained in:
117
queue/queue.go
Normal file
117
queue/queue.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Item struct {
|
||||
Value interface{}
|
||||
Priority int
|
||||
}
|
||||
|
||||
type LoafleQueue struct {
|
||||
mtx sync.Mutex
|
||||
items []*Item
|
||||
interval time.Duration
|
||||
size int
|
||||
|
||||
senderChanel chan interface{}
|
||||
}
|
||||
|
||||
func (lq LoafleQueue) Len() int {
|
||||
return len(lq.items)
|
||||
}
|
||||
|
||||
func (lq LoafleQueue) Less(i, j int) bool {
|
||||
|
||||
return lq.items[i].Priority < lq.items[j].Priority
|
||||
}
|
||||
|
||||
func (lq LoafleQueue) Swap(i, j int) {
|
||||
lq.items[i], lq.items[j] = lq.items[j], lq.items[i]
|
||||
}
|
||||
|
||||
func (lq *LoafleQueue) getItems() {
|
||||
resultItems := make([]*Item, 0)
|
||||
|
||||
for {
|
||||
|
||||
time.Sleep(time.Second * lq.interval)
|
||||
|
||||
if lq.Len() > 0 {
|
||||
lq.size = lq.Len()
|
||||
|
||||
lq.mtx.Lock()
|
||||
for i := 0; i < lq.size; i++ {
|
||||
item := heap.Pop(lq).(*Item)
|
||||
resultItems = append(resultItems, item)
|
||||
}
|
||||
lq.mtx.Unlock()
|
||||
|
||||
lq.senderChanel <- resultItems
|
||||
lq.size = lq.Len()
|
||||
//log.Println("result length: ", len(resultItems))
|
||||
resultItems = nil
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (lq *LoafleQueue) Push(i interface{}) {
|
||||
lq.mtx.Lock()
|
||||
defer lq.mtx.Unlock()
|
||||
|
||||
n := len(lq.items)
|
||||
nItem := i.(*Item)
|
||||
nItem.Priority = n
|
||||
lq.items = append(lq.items, nItem)
|
||||
}
|
||||
|
||||
func (lq *LoafleQueue) Pop() interface{} {
|
||||
old := lq.items
|
||||
n := len(old)
|
||||
nItem := old[n-1]
|
||||
lq.items = old[0 : n-1]
|
||||
return nItem
|
||||
}
|
||||
func (lq LoafleQueue) newItem(value interface{}) *Item {
|
||||
return &Item{
|
||||
Value: value,
|
||||
}
|
||||
}
|
||||
|
||||
func (lq *LoafleQueue) PushItem(v interface{}) {
|
||||
|
||||
it := lq.newItem(v)
|
||||
heap.Push(lq, it)
|
||||
|
||||
}
|
||||
|
||||
func (lq *LoafleQueue) Close() {
|
||||
//if lq.eventChanel != nil {
|
||||
// observer.Remove(lq.queueType, lq.eventChanel)
|
||||
//}
|
||||
lq.items = nil
|
||||
}
|
||||
|
||||
func NewQueue(interval time.Duration, senderChanel chan interface{}) *LoafleQueue {
|
||||
items := make([]*Item, 0)
|
||||
|
||||
log.Println("NewQueu Start")
|
||||
lq := &LoafleQueue{
|
||||
items: items,
|
||||
interval: interval,
|
||||
senderChanel: senderChanel,
|
||||
}
|
||||
|
||||
heap.Init(lq)
|
||||
|
||||
//observer.Add(eventType, lq.eventChanel)
|
||||
//go lq.notifyEventHandler(event)
|
||||
go lq.getItems()
|
||||
|
||||
return lq
|
||||
}
|
||||
64
queue/queue_test.go
Normal file
64
queue/queue_test.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"git.loafle.net/overflow/agent_api/observer"
|
||||
"git.loafle.net/overflow/agent_api/observer/messages"
|
||||
"log"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestNewQueue(t *testing.T) {
|
||||
senderChanel := make(chan interface{}, 0)
|
||||
|
||||
lq := NewQueue(messages.QUEUE_DATA, 3, senderChanel)
|
||||
|
||||
log.Println(lq)
|
||||
|
||||
observer.Notify(messages.QUEUE_DATA, "data")
|
||||
observer.Notify(messages.QUEUE_DATA, "data1111")
|
||||
observer.Notify(messages.QUEUE_DATA, "data2222")
|
||||
observer.Notify(messages.QUEUE_DATA, "data3333")
|
||||
observer.Notify(messages.QUEUE_DATA, "data4444")
|
||||
observer.Notify(messages.QUEUE_DATA, "data5555")
|
||||
observer.Notify(messages.QUEUE_DATA, "data6666")
|
||||
observer.Notify(messages.QUEUE_DATA, "data7777")
|
||||
observer.Notify(messages.QUEUE_DATA, "data8888")
|
||||
observer.Notify(messages.QUEUE_DATA, "data9999")
|
||||
|
||||
go GetItemTest(senderChanel)
|
||||
|
||||
time.Sleep(time.Second * 3)
|
||||
|
||||
observer.Notify(messages.QUEUE_DATA, "data1111")
|
||||
observer.Notify(messages.QUEUE_DATA, "data2222")
|
||||
observer.Notify(messages.QUEUE_DATA, "data3333")
|
||||
observer.Notify(messages.QUEUE_DATA, "data4444")
|
||||
observer.Notify(messages.QUEUE_DATA, "data5555")
|
||||
observer.Notify(messages.QUEUE_DATA, "data6666")
|
||||
observer.Notify(messages.QUEUE_DATA, "data7777")
|
||||
|
||||
time.Sleep(time.Second * 10)
|
||||
|
||||
lq.Close()
|
||||
close(senderChanel)
|
||||
}
|
||||
|
||||
func GetItemTest(senderChanel chan interface{}) {
|
||||
|
||||
for sc := range senderChanel {
|
||||
items := reflect.ValueOf(sc)
|
||||
if items.Kind() != reflect.Slice {
|
||||
log.Println("ddddd")
|
||||
}
|
||||
//log.Println(items.Len())
|
||||
for i := 0; i < items.Len(); i++ {
|
||||
d := items.Index(i).Elem().Interface()
|
||||
dd := d.(Item)
|
||||
|
||||
log.Printf("data: %s , periorty: %d", dd.Value, dd.Priority)
|
||||
}
|
||||
//fmt.Println(sc)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user