agent_api/observer/observer.go

102 lines
1.7 KiB
Go
Raw Normal View History

2017-04-14 04:50:37 +00:00
package observer
2017-04-14 05:27:42 +00:00
import (
"errors"
"sync"
)
2017-04-17 04:43:43 +00:00
type Observer interface {
Add(id string, ch chan interface{}) error
Remove(id string, rch chan interface{}) error
Notify(id string, arg interface{}) error
}
2017-04-14 05:27:42 +00:00
type observer struct {
mtx sync.Mutex
events map[string][]chan interface{}
}
func (o *observer) Add(id string, ch chan interface{}) error {
o.mtx.Lock()
defer o.mtx.Unlock()
_, ok := o.events[id]
if ok {
o.events[id] = append(o.events[id], ch)
} else {
arr := make([]chan interface{}, 0)
arr = append(arr, ch)
o.events[id] = arr
}
return nil
}
func (o *observer) Remove(id string, rch chan interface{}) error {
o.mtx.Lock()
defer o.mtx.Unlock()
newArr := make([]chan interface{}, 0)
chans, ok := o.events[id]
if !ok {
return errors.New("event not found")
}
for _, ch := range chans {
if ch != rch {
newArr = append(newArr, ch)
} else {
close(ch)
2017-04-14 06:28:33 +00:00
ch = nil
2017-04-14 05:27:42 +00:00
}
}
2017-04-14 06:28:33 +00:00
if len(newArr) == 0 {
delete(o.events,id)
} else {
o.events[id] = newArr
}
2017-04-14 05:27:42 +00:00
return nil
}
func (o *observer) Notify(id string, arg interface{}) error {
o.mtx.Lock()
defer o.mtx.Unlock()
chans, ok := o.events[id]
if !ok {
return errors.New("event not found")
}
for _, ch := range chans {
ch <- arg
}
return nil
}
var _observer *observer
var once sync.Once
func init() {
once.Do(func() {
_observer = &observer{
events: make(map[string][]chan interface{}),
}
})
}
2017-04-14 06:28:33 +00:00
// interface
2017-04-17 04:43:43 +00:00
func GetInstance() Observer {
return _observer
}
2017-04-14 05:27:42 +00:00
func Add(id string, ch chan interface{}) error {
return _observer.Add(id, ch)
}
func Remove(id string, rch chan interface{}) error {
return _observer.Remove(id, rch)
}
func Notify(id string, arg interface{}) error {
return _observer.Notify(id, arg)
2017-04-14 06:29:54 +00:00
}