agent_api/observer/observer.go
jackdaw@loafle.com f958610bbf .
2017-04-17 13:43:43 +09:00

102 lines
1.7 KiB
Go

package observer
import (
"errors"
"sync"
)
type Observer interface {
Add(id string, ch chan interface{}) error
Remove(id string, rch chan interface{}) error
Notify(id string, arg interface{}) error
}
type observer struct {
mtx sync.Mutex
events map[string][]chan interface{}
}
func (o *observer) Add(id string, ch chan interface{}) error {
o.mtx.Lock()
defer o.mtx.Unlock()
_, ok := o.events[id]
if ok {
o.events[id] = append(o.events[id], ch)
} else {
arr := make([]chan interface{}, 0)
arr = append(arr, ch)
o.events[id] = arr
}
return nil
}
func (o *observer) Remove(id string, rch chan interface{}) error {
o.mtx.Lock()
defer o.mtx.Unlock()
newArr := make([]chan interface{}, 0)
chans, ok := o.events[id]
if !ok {
return errors.New("event not found")
}
for _, ch := range chans {
if ch != rch {
newArr = append(newArr, ch)
} else {
close(ch)
ch = nil
}
}
if len(newArr) == 0 {
delete(o.events,id)
} else {
o.events[id] = newArr
}
return nil
}
func (o *observer) Notify(id string, arg interface{}) error {
o.mtx.Lock()
defer o.mtx.Unlock()
chans, ok := o.events[id]
if !ok {
return errors.New("event not found")
}
for _, ch := range chans {
ch <- arg
}
return nil
}
var _observer *observer
var once sync.Once
func init() {
once.Do(func() {
_observer = &observer{
events: make(map[string][]chan interface{}),
}
})
}
// interface
func GetInstance() Observer {
return _observer
}
func Add(id string, ch chan interface{}) error {
return _observer.Add(id, ch)
}
func Remove(id string, rch chan interface{}) error {
return _observer.Remove(id, rch)
}
func Notify(id string, arg interface{}) error {
return _observer.Notify(id, arg)
}