diff --git a/auth/auth.go b/auth/auth.go index 281ce17..d907117 100644 --- a/auth/auth.go +++ b/auth/auth.go @@ -6,7 +6,6 @@ import ( "fmt" "net/http" "path" - "time" lfcc "git.loafle.net/commons_go/config" "git.loafle.net/commons_go/logging" @@ -79,8 +78,7 @@ func (h *authHandlers) Serve() error { } h.c = client.New() - h.c.OnNotify(module.NoAuthProbeService_AcceptNoAuthProbe, h.onNoAuthProbeAccept) - h.c.OnNotify(module.NoAuthProbeService_DenyNoauthProbe, h.onNoAuthProbeDeny) + h.c.OnNotify(h.onNotify) var err error if nil != h.noAuthConfig.TempKey && "" != *h.noAuthConfig.TempKey { @@ -150,32 +148,3 @@ func (h *authHandlers) serveConnect(noAuthTempKey string) error { return nil } - -func (h *authHandlers) onNoAuthProbeAccept(method string, params interface{}) { - var err error - ps := params.([]string) - probeID := ps[0] - - if lfcc.Exists(h.probeConfigPath) { - if err = lfcc.Load(&h.probeConfig, h.probeConfigPath); nil != err { - logging.Logger.Error(fmt.Sprintf("Auth: Loading of Probe config file[%s] failed error[%v]", h.probeConfigPath, err)) - } - } - - h.probeConfig.ID = &probeID - if err = lfcc.Save(h.probeConfig, h.probeConfigPath, true); nil != err { - logging.Logger.Error(fmt.Sprintf("Auth: Saving of Probe config file[%s] failed error[%v]", h.probeConfigPath, err)) - } - - h.acceptedChan <- true -} - -func (h *authHandlers) onNoAuthProbeDeny(method string, params interface{}) { - n := time.Now() - h.noAuthConfig.DenyDate = &n - if err := lfcc.Save(h.noAuthConfig, h.noAuthConfigPath, true); nil != err { - logging.Logger.Error(fmt.Sprintf("Auth: Saving of NoAuth config file[%s] failed error[%v]", h.noAuthConfigPath, err)) - } - - h.deniedChan <- fmt.Errorf("This probe have been denied from overFlow") -} diff --git a/auth/on_notify.go b/auth/on_notify.go new file mode 100644 index 0000000..c05280e --- /dev/null +++ b/auth/on_notify.go @@ -0,0 +1,51 @@ +package auth + +import ( + "fmt" + "time" + + lfcc "git.loafle.net/commons_go/config" + "git.loafle.net/commons_go/logging" + "git.loafle.net/overflow/overflow_probes/central/api/module" +) + +func (h *authHandlers) onNotify(method string, params interface{}) { + switch method { + case module.NoAuthProbeService_AcceptNoAuthProbe: + h.onNoAuthProbeAccept(params) + break + case module.NoAuthProbeService_DenyNoauthProbe: + h.onNoAuthProbeDeny(params) + break + } + +} + +func (h *authHandlers) onNoAuthProbeAccept(params interface{}) { + var err error + ps := params.([]string) + probeID := ps[0] + + if lfcc.Exists(h.probeConfigPath) { + if err = lfcc.Load(&h.probeConfig, h.probeConfigPath); nil != err { + logging.Logger.Error(fmt.Sprintf("Auth: Loading of Probe config file[%s] failed error[%v]", h.probeConfigPath, err)) + } + } + + h.probeConfig.ID = &probeID + if err = lfcc.Save(h.probeConfig, h.probeConfigPath, true); nil != err { + logging.Logger.Error(fmt.Sprintf("Auth: Saving of Probe config file[%s] failed error[%v]", h.probeConfigPath, err)) + } + + h.acceptedChan <- true +} + +func (h *authHandlers) onNoAuthProbeDeny(params interface{}) { + n := time.Now() + h.noAuthConfig.DenyDate = &n + if err := lfcc.Save(h.noAuthConfig, h.noAuthConfigPath, true); nil != err { + logging.Logger.Error(fmt.Sprintf("Auth: Saving of NoAuth config file[%s] failed error[%v]", h.noAuthConfigPath, err)) + } + + h.deniedChan <- fmt.Errorf("This probe have been denied from overFlow") +} diff --git a/central/client/client.go b/central/client/client.go index 7611b0d..617cf9a 100644 --- a/central/client/client.go +++ b/central/client/client.go @@ -54,31 +54,30 @@ type Client interface { Dial(url string, header http.Header, readBufSize int, writeBufSize int) (*http.Response, error) Call(method string, args interface{}, result interface{}) error Notify(method string, args interface{}) error - OnNotify(method string, cb OnNotifyFunc) + OnNotify(cb OnNotifyFunc) OnClose(cb OnCloseFunc) Shutdown(ctx context.Context) error } type client struct { - conn *websocket.Conn - sendMutex sync.Mutex - request protocol.Request - notification protocol.Notification - mutex sync.Mutex - requestID uint64 - pending map[uint64]*Call - closing bool // user has called Close - shutdown bool // server has told us to stop - onNotifyHandlers map[string][]OnNotifyFunc - onCloseHandlers []OnCloseFunc + conn *websocket.Conn + sendMutex sync.Mutex + request protocol.Request + notification protocol.Notification + mutex sync.Mutex + requestID uint64 + pending map[uint64]*Call + closing bool // user has called Close + shutdown bool // server has told us to stop + onNotifyHandler OnNotifyFunc + onCloseHandlers []OnCloseFunc } func New() Client { c := &client{ - requestID: 0, - pending: make(map[uint64]*Call), - onNotifyHandlers: make(map[string][]OnNotifyFunc), - onCloseHandlers: make([]OnCloseFunc, 1), + requestID: 0, + pending: make(map[uint64]*Call), + onCloseHandlers: make([]OnCloseFunc, 1), } return c @@ -123,15 +122,8 @@ func (c *client) Notify(method string, args interface{}) error { return nil } -func (c *client) OnNotify(method string, cb OnNotifyFunc) { - var hs []OnNotifyFunc - var ok bool - if hs, ok = c.onNotifyHandlers[method]; !ok { - hs = make([]OnNotifyFunc, 1) - c.onNotifyHandlers[method] = hs - } - - hs = append(hs, cb) +func (c *client) OnNotify(cb OnNotifyFunc) { + c.onNotifyHandler = cb } func (c *client) OnClose(cb OnCloseFunc) { @@ -214,21 +206,24 @@ func (c *client) input() { var res protocol.Response var noti protocol.Notification var messageType int - var reader io.Reader + var buff []byte for err == nil { res = protocol.Response{} - if messageType, reader, err = c.conn.NextReader(); nil != err { - break + + if messageType, buff, err = c.conn.ReadMessage(); nil != err { + logging.Logger.Error(fmt.Sprintf("Client: Reader error[%v]", err)) + continue } logging.Logger.Debug(fmt.Sprintf("Client: messageType:%d", messageType)) - if err = json.NewDecoder(reader).Decode(res); nil != err { + if err = json.Unmarshal(buff, &res); nil != err { noti = protocol.Notification{} - if err = json.NewDecoder(reader).Decode(noti); nil != err { - break + if err = json.Unmarshal(buff, ¬i); nil != err { + logging.Logger.Error(fmt.Sprintf("Client: Decode error[%v]", err)) + continue } else { - err = c.onNotification(noti) + c.onNotification(noti) } } else { err = c.onResponse(res) @@ -290,17 +285,12 @@ func (c *client) onResponse(res protocol.Response) error { return err } -func (c *client) onNotification(noti protocol.Notification) error { - var err error - var hs []OnNotifyFunc - var ok bool - if hs, ok = c.onNotifyHandlers[noti.Method]; ok { - for _, h := range hs { - h(noti.Method, noti.Params) - } +func (c *client) onNotification(noti protocol.Notification) { + if nil == c.onNotifyHandler { + return } - return err + c.onNotifyHandler(noti.Method, noti.Params) } func (c *client) connCloseHandler(code int, text string) error { diff --git a/probe/probe.go b/probe/probe.go index 8c8f9f4..1f38375 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -102,6 +102,7 @@ func (p *probe) connectToCentral() error { return err } logging.Logger.Debug(fmt.Sprintf("Probe: Connect Probe HTTP Status[%s]", res.Status)) + p.probeClient.OnNotify(p.onNotify) p.metricsClient = client.New() if res, err = p.metricsClient.Dial(p.metricsEntryURL, header, 4096, 4096); nil != err { @@ -111,3 +112,15 @@ func (p *probe) connectToCentral() error { return nil } + +func (p *probe) onNotify(method string, params interface{}) { + switch method { + case module.NoAuthProbeService_AcceptNoAuthProbe: + + break + case module.NoAuthProbeService_DenyNoauthProbe: + + break + } + +}