Type
This commit is contained in:
parent
64d8250a48
commit
3689c51b74
34
queue.go
34
queue.go
|
@ -3,17 +3,9 @@ package queue
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
"loafle.com/overflow/agent_api/observer"
|
"loafle.com/overflow/agent_api/observer"
|
||||||
"log"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type EventType int
|
|
||||||
|
|
||||||
const (
|
|
||||||
DATA_TYPE EventType = 0
|
|
||||||
EVENT_TYPE EventType = 1
|
|
||||||
)
|
|
||||||
|
|
||||||
type Item struct {
|
type Item struct {
|
||||||
Value interface{}
|
Value interface{}
|
||||||
Priority int
|
Priority int
|
||||||
|
@ -21,7 +13,7 @@ type Item struct {
|
||||||
|
|
||||||
type LoafleQueue struct {
|
type LoafleQueue struct {
|
||||||
items []*Item
|
items []*Item
|
||||||
queueType string
|
queueType observer.KEY
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
|
|
||||||
eventChanel chan interface{}
|
eventChanel chan interface{}
|
||||||
|
@ -78,41 +70,25 @@ func (lq *LoafleQueue) notifyEventHandler(c chan interface{}) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func StringType(tp EventType) string {
|
|
||||||
var tempType string
|
|
||||||
|
|
||||||
if tp == DATA_TYPE {
|
|
||||||
tempType = "DATA_TYPE"
|
|
||||||
} else if tp == EVENT_TYPE {
|
|
||||||
tempType = "EVENT_TYPE"
|
|
||||||
} else {
|
|
||||||
log.Fatal("Event Type Error")
|
|
||||||
}
|
|
||||||
|
|
||||||
return tempType
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lq *LoafleQueue) Close() {
|
func (lq *LoafleQueue) Close() {
|
||||||
if lq.eventChanel != nil {
|
if lq.eventChanel != nil {
|
||||||
observer.Remove(lq.queueType, lq.eventChanel)
|
observer.Remove(lq.queueType.String(), lq.eventChanel)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func NewQueue(eventType EventType, interval time.Duration) *LoafleQueue {
|
func NewQueue(eventType observer.KEY, interval time.Duration) *LoafleQueue {
|
||||||
items := make([]*Item, 0)
|
items := make([]*Item, 0)
|
||||||
event := make(chan interface{}, 0)
|
event := make(chan interface{}, 0)
|
||||||
|
|
||||||
tempType := StringType(eventType)
|
|
||||||
|
|
||||||
lq := &LoafleQueue{
|
lq := &LoafleQueue{
|
||||||
items: items,
|
items: items,
|
||||||
queueType: tempType,
|
queueType: eventType,
|
||||||
interval: interval,
|
interval: interval,
|
||||||
eventChanel: event,
|
eventChanel: event,
|
||||||
}
|
}
|
||||||
|
|
||||||
heap.Init(lq)
|
heap.Init(lq)
|
||||||
|
|
||||||
observer.Add(lq.queueType, lq.eventChanel)
|
observer.Add(eventType.String(), lq.eventChanel)
|
||||||
go lq.notifyEventHandler(event)
|
go lq.notifyEventHandler(event)
|
||||||
|
|
||||||
return lq
|
return lq
|
||||||
|
|
|
@ -7,10 +7,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestNewQueue(t *testing.T) {
|
func TestNewQueue(t *testing.T) {
|
||||||
lq := NewQueue(EVENT_TYPE, 5)
|
lq := NewQueue(observer.DATA_QUEUE, 5)
|
||||||
typeEvent := StringType(EVENT_TYPE)
|
|
||||||
|
|
||||||
observer.Notify(typeEvent, "data")
|
observer.Notify(observer.DATA_QUEUE.String(), "data")
|
||||||
|
|
||||||
re := lq.GetItems()
|
re := lq.GetItems()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user