This commit is contained in:
crusader 2017-09-28 15:08:43 +09:00
parent 67b3f1426c
commit 0c819ad340
4 changed files with 96 additions and 73 deletions

View File

@ -6,7 +6,6 @@ import (
"fmt"
"net/http"
"path"
"time"
lfcc "git.loafle.net/commons_go/config"
"git.loafle.net/commons_go/logging"
@ -79,8 +78,7 @@ func (h *authHandlers) Serve() error {
}
h.c = client.New()
h.c.OnNotify(module.NoAuthProbeService_AcceptNoAuthProbe, h.onNoAuthProbeAccept)
h.c.OnNotify(module.NoAuthProbeService_DenyNoauthProbe, h.onNoAuthProbeDeny)
h.c.OnNotify(h.onNotify)
var err error
if nil != h.noAuthConfig.TempKey && "" != *h.noAuthConfig.TempKey {
@ -150,32 +148,3 @@ func (h *authHandlers) serveConnect(noAuthTempKey string) error {
return nil
}
func (h *authHandlers) onNoAuthProbeAccept(method string, params interface{}) {
var err error
ps := params.([]string)
probeID := ps[0]
if lfcc.Exists(h.probeConfigPath) {
if err = lfcc.Load(&h.probeConfig, h.probeConfigPath); nil != err {
logging.Logger.Error(fmt.Sprintf("Auth: Loading of Probe config file[%s] failed error[%v]", h.probeConfigPath, err))
}
}
h.probeConfig.ID = &probeID
if err = lfcc.Save(h.probeConfig, h.probeConfigPath, true); nil != err {
logging.Logger.Error(fmt.Sprintf("Auth: Saving of Probe config file[%s] failed error[%v]", h.probeConfigPath, err))
}
h.acceptedChan <- true
}
func (h *authHandlers) onNoAuthProbeDeny(method string, params interface{}) {
n := time.Now()
h.noAuthConfig.DenyDate = &n
if err := lfcc.Save(h.noAuthConfig, h.noAuthConfigPath, true); nil != err {
logging.Logger.Error(fmt.Sprintf("Auth: Saving of NoAuth config file[%s] failed error[%v]", h.noAuthConfigPath, err))
}
h.deniedChan <- fmt.Errorf("This probe have been denied from overFlow")
}

51
auth/on_notify.go Normal file
View File

@ -0,0 +1,51 @@
package auth
import (
"fmt"
"time"
lfcc "git.loafle.net/commons_go/config"
"git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_probes/central/api/module"
)
func (h *authHandlers) onNotify(method string, params interface{}) {
switch method {
case module.NoAuthProbeService_AcceptNoAuthProbe:
h.onNoAuthProbeAccept(params)
break
case module.NoAuthProbeService_DenyNoauthProbe:
h.onNoAuthProbeDeny(params)
break
}
}
func (h *authHandlers) onNoAuthProbeAccept(params interface{}) {
var err error
ps := params.([]string)
probeID := ps[0]
if lfcc.Exists(h.probeConfigPath) {
if err = lfcc.Load(&h.probeConfig, h.probeConfigPath); nil != err {
logging.Logger.Error(fmt.Sprintf("Auth: Loading of Probe config file[%s] failed error[%v]", h.probeConfigPath, err))
}
}
h.probeConfig.ID = &probeID
if err = lfcc.Save(h.probeConfig, h.probeConfigPath, true); nil != err {
logging.Logger.Error(fmt.Sprintf("Auth: Saving of Probe config file[%s] failed error[%v]", h.probeConfigPath, err))
}
h.acceptedChan <- true
}
func (h *authHandlers) onNoAuthProbeDeny(params interface{}) {
n := time.Now()
h.noAuthConfig.DenyDate = &n
if err := lfcc.Save(h.noAuthConfig, h.noAuthConfigPath, true); nil != err {
logging.Logger.Error(fmt.Sprintf("Auth: Saving of NoAuth config file[%s] failed error[%v]", h.noAuthConfigPath, err))
}
h.deniedChan <- fmt.Errorf("This probe have been denied from overFlow")
}

View File

@ -54,31 +54,30 @@ type Client interface {
Dial(url string, header http.Header, readBufSize int, writeBufSize int) (*http.Response, error)
Call(method string, args interface{}, result interface{}) error
Notify(method string, args interface{}) error
OnNotify(method string, cb OnNotifyFunc)
OnNotify(cb OnNotifyFunc)
OnClose(cb OnCloseFunc)
Shutdown(ctx context.Context) error
}
type client struct {
conn *websocket.Conn
sendMutex sync.Mutex
request protocol.Request
notification protocol.Notification
mutex sync.Mutex
requestID uint64
pending map[uint64]*Call
closing bool // user has called Close
shutdown bool // server has told us to stop
onNotifyHandlers map[string][]OnNotifyFunc
onCloseHandlers []OnCloseFunc
conn *websocket.Conn
sendMutex sync.Mutex
request protocol.Request
notification protocol.Notification
mutex sync.Mutex
requestID uint64
pending map[uint64]*Call
closing bool // user has called Close
shutdown bool // server has told us to stop
onNotifyHandler OnNotifyFunc
onCloseHandlers []OnCloseFunc
}
func New() Client {
c := &client{
requestID: 0,
pending: make(map[uint64]*Call),
onNotifyHandlers: make(map[string][]OnNotifyFunc),
onCloseHandlers: make([]OnCloseFunc, 1),
requestID: 0,
pending: make(map[uint64]*Call),
onCloseHandlers: make([]OnCloseFunc, 1),
}
return c
@ -123,15 +122,8 @@ func (c *client) Notify(method string, args interface{}) error {
return nil
}
func (c *client) OnNotify(method string, cb OnNotifyFunc) {
var hs []OnNotifyFunc
var ok bool
if hs, ok = c.onNotifyHandlers[method]; !ok {
hs = make([]OnNotifyFunc, 1)
c.onNotifyHandlers[method] = hs
}
hs = append(hs, cb)
func (c *client) OnNotify(cb OnNotifyFunc) {
c.onNotifyHandler = cb
}
func (c *client) OnClose(cb OnCloseFunc) {
@ -214,21 +206,24 @@ func (c *client) input() {
var res protocol.Response
var noti protocol.Notification
var messageType int
var reader io.Reader
var buff []byte
for err == nil {
res = protocol.Response{}
if messageType, reader, err = c.conn.NextReader(); nil != err {
break
if messageType, buff, err = c.conn.ReadMessage(); nil != err {
logging.Logger.Error(fmt.Sprintf("Client: Reader error[%v]", err))
continue
}
logging.Logger.Debug(fmt.Sprintf("Client: messageType:%d", messageType))
if err = json.NewDecoder(reader).Decode(res); nil != err {
if err = json.Unmarshal(buff, &res); nil != err {
noti = protocol.Notification{}
if err = json.NewDecoder(reader).Decode(noti); nil != err {
break
if err = json.Unmarshal(buff, &noti); nil != err {
logging.Logger.Error(fmt.Sprintf("Client: Decode error[%v]", err))
continue
} else {
err = c.onNotification(noti)
c.onNotification(noti)
}
} else {
err = c.onResponse(res)
@ -290,17 +285,12 @@ func (c *client) onResponse(res protocol.Response) error {
return err
}
func (c *client) onNotification(noti protocol.Notification) error {
var err error
var hs []OnNotifyFunc
var ok bool
if hs, ok = c.onNotifyHandlers[noti.Method]; ok {
for _, h := range hs {
h(noti.Method, noti.Params)
}
func (c *client) onNotification(noti protocol.Notification) {
if nil == c.onNotifyHandler {
return
}
return err
c.onNotifyHandler(noti.Method, noti.Params)
}
func (c *client) connCloseHandler(code int, text string) error {

View File

@ -102,6 +102,7 @@ func (p *probe) connectToCentral() error {
return err
}
logging.Logger.Debug(fmt.Sprintf("Probe: Connect Probe HTTP Status[%s]", res.Status))
p.probeClient.OnNotify(p.onNotify)
p.metricsClient = client.New()
if res, err = p.metricsClient.Dial(p.metricsEntryURL, header, 4096, 4096); nil != err {
@ -111,3 +112,15 @@ func (p *probe) connectToCentral() error {
return nil
}
func (p *probe) onNotify(method string, params interface{}) {
switch method {
case module.NoAuthProbeService_AcceptNoAuthProbe:
break
case module.NoAuthProbeService_DenyNoauthProbe:
break
}
}