agent_api/observer/observer.go
jackdaw@loafle.com 0ad3637ab4 .
2017-04-14 15:29:54 +09:00

91 lines
1.5 KiB
Go

package observer
import (
"errors"
"sync"
)
type observer struct {
mtx sync.Mutex
events map[string][]chan interface{}
}
func (o *observer) Add(id string, ch chan interface{}) error {
o.mtx.Lock()
defer o.mtx.Unlock()
_, ok := o.events[id]
if ok {
o.events[id] = append(o.events[id], ch)
} else {
arr := make([]chan interface{}, 0)
arr = append(arr, ch)
o.events[id] = arr
}
return nil
}
func (o *observer) Remove(id string, rch chan interface{}) error {
o.mtx.Lock()
defer o.mtx.Unlock()
newArr := make([]chan interface{}, 0)
chans, ok := o.events[id]
if !ok {
return errors.New("event not found")
}
for _, ch := range chans {
if ch != rch {
newArr = append(newArr, ch)
} else {
close(ch)
ch = nil
}
}
if len(newArr) == 0 {
delete(o.events,id)
} else {
o.events[id] = newArr
}
return nil
}
func (o *observer) Notify(id string, arg interface{}) error {
o.mtx.Lock()
defer o.mtx.Unlock()
chans, ok := o.events[id]
if !ok {
return errors.New("event not found")
}
for _, ch := range chans {
ch <- arg
}
return nil
}
var _observer *observer
var once sync.Once
func init() {
once.Do(func() {
_observer = &observer{
events: make(map[string][]chan interface{}),
}
})
}
// interface
func Add(id string, ch chan interface{}) error {
return _observer.Add(id, ch)
}
func Remove(id string, rch chan interface{}) error {
return _observer.Remove(id, rch)
}
func Notify(id string, arg interface{}) error {
return _observer.Notify(id, arg)
}