package observer import ( "errors" "sync" ) type Observer interface { Add(id string, ch chan interface{}) error Remove(id string, rch chan interface{}) error Notify(id string, arg interface{}) error } type observer struct { mtx sync.Mutex events map[string][]chan interface{} } func (o *observer) Add(id string, ch chan interface{}) error { o.mtx.Lock() defer o.mtx.Unlock() _, ok := o.events[id] if ok { o.events[id] = append(o.events[id], ch) } else { arr := make([]chan interface{}, 0) arr = append(arr, ch) o.events[id] = arr } return nil } func (o *observer) Remove(id string, rch chan interface{}) error { o.mtx.Lock() defer o.mtx.Unlock() newArr := make([]chan interface{}, 0) chans, ok := o.events[id] if !ok { return errors.New("event not found") } for _, ch := range chans { if ch != rch { newArr = append(newArr, ch) } else { close(ch) ch = nil } } if len(newArr) == 0 { delete(o.events,id) } else { o.events[id] = newArr } return nil } func (o *observer) Notify(id string, arg interface{}) error { o.mtx.Lock() defer o.mtx.Unlock() chans, ok := o.events[id] if !ok { return errors.New("event not found") } for _, ch := range chans { ch <- arg } return nil } var _observer *observer var once sync.Once func init() { once.Do(func() { _observer = &observer{ events: make(map[string][]chan interface{}), } }) } // interface func GetInstance() Observer { return _observer } func Add(id string, ch chan interface{}) error { return _observer.Add(id, ch) } func Remove(id string, rch chan interface{}) error { return _observer.Remove(id, rch) } func Notify(id string, arg interface{}) error { return _observer.Notify(id, arg) }