diff --git a/auth/auth.go b/auth/auth.go index 169514d..3f43e8c 100644 --- a/auth/auth.go +++ b/auth/auth.go @@ -1,173 +1,173 @@ package auth import ( - "context" "fmt" - "net/http" "path" + "sync" + "time" - lfcc "git.loafle.net/commons_go/config" + cc "git.loafle.net/commons_go/config" "git.loafle.net/commons_go/logging" - "git.loafle.net/overflow/overflow_probes/central/api/module" - "git.loafle.net/overflow/overflow_probes/central/client" - "git.loafle.net/overflow/overflow_probes/commons" + ooccn "git.loafle.net/overflow/overflow_commons_go/config/noauthprobe" + noauthprobeM "git.loafle.net/overflow/overflow_commons_go/modules/noauthprobe/model" + probeM "git.loafle.net/overflow/overflow_commons_go/modules/probe/model" + oogwc "git.loafle.net/overflow/overflow_gateway_websocket/client" + "git.loafle.net/overflow/overflow_probes/auth/client" + oopai "git.loafle.net/overflow/overflow_probes/auth/info" "git.loafle.net/overflow/overflow_probes/config" - opuu "git.loafle.net/overflow/overflow_probes/util/url" -) - -const ( - noAuthEntryPoint = "/auth" ) type Auther interface { - commons.EndableStarter - commons.Shutdowner + EndableStart(doneChan chan<- error) error + Stop() } type auth struct { - centralClient client.Client - entryURL string + doneChan chan<- error - noAuthConfigPath string - noAuthConfig config.NoAuthProbeConfig + cClient oogwc.Client - endded chan<- error + configPath string + config ooccn.NoAuthProbeConfig - shutdown chan bool - accepted chan bool - denied chan error + tempProbeKeyChan chan string + acceptChan chan *probeM.Probe + denyChan chan *noauthprobeM.NoAuthProbe + + stopChan chan struct{} + stopWg sync.WaitGroup } -func New() (Auther, error) { - var err error +func New() Auther { + a := &auth{} - a := &auth{ - shutdown: make(chan bool), - accepted: make(chan bool), - denied: make(chan error), + return a +} + +func (a *auth) EndableStart(doneChan chan<- error) error { + if nil != a.stopChan { + logging.Logger().Panic("Auth: auth is already running. Stop it before starting it again") } - if a.entryURL, err = opuu.Join(config.CFG.Central.URL, noAuthEntryPoint); nil != err { - return nil, err - } + a.configPath = path.Join(*config.ConfigDir, ooccn.ConfigFileName) - a.noAuthConfigPath = path.Join(*config.ConfigDir, config.NoAuthProbeConfigFileName) - - conf := lfcc.New() - if lfcc.Exists(a.noAuthConfigPath) { - if err = conf.Load(&a.noAuthConfig, a.noAuthConfigPath); nil != err { - return nil, fmt.Errorf("Auth: Loading of NoAuth config file[%s] failed error[%v]", a.noAuthConfigPath, err) + conf := cc.New() + if cc.Exists(a.configPath) { + if err := conf.Load(&a.config, a.configPath); nil != err { + return fmt.Errorf("Auth: Loading of NoAuth config file[%s] failed error[%v]", a.configPath, err) } } - a.centralClient = client.New() - a.centralClient.OnNotify(a.onNotify) - - return a, nil -} - -func (a *auth) EndableStart(endded chan<- error) error { - a.endded = endded - return a.start() -} - -func (a *auth) start() error { - if nil != a.noAuthConfig.DenyDate { - return fmt.Errorf("Cannot start because this probe have been denied from overFlow at %s", a.noAuthConfig.DenyDate.String()) + if nil != a.config.DenyDate { + return fmt.Errorf("Cannot start because this probe have been denied from overFlow at %s", a.config.DenyDate.String()) } - var err error - if nil != a.noAuthConfig.TempKey && "" != *a.noAuthConfig.TempKey { - err = a.serveConnect(*a.noAuthConfig.TempKey) - } else { - err = a.serveRegistration() - } - if nil != err { - return err + registerRequestHeader := "" + if ooccn.NoAuthProbeStateTypeNotRegisterd == a.config.State() { + i, err := oopai.GetRegistHeader() + if nil != err { + return fmt.Errorf("Auth: Gathering system information has been failed %v", err) + } + registerRequestHeader = i } - a.listen() + a.tempProbeKeyChan = make(chan string, 1) + a.acceptChan = make(chan *probeM.Probe, 1) + a.denyChan = make(chan *noauthprobeM.NoAuthProbe, 1) + + rpcInvoker := initRPCRegistry(a) + ch := client.NewClientHandler(rpcInvoker) + sb := client.NewSocketBuilder(&a.config, a.tempProbeKeyChan, registerRequestHeader) + if nil == sb { + return fmt.Errorf("Auth: Cannot create SocketBuilder") + } + + a.cClient = client.NewClient(ch, sb) + + a.doneChan = doneChan + a.stopChan = make(chan struct{}) + + a.stopWg.Add(1) + go a.handleAuth() return nil } -func (a *auth) listen() { - go func() { - for { - select { - case <-a.shutdown: - break - case <-a.accepted: - a.stop(nil) - break - case err := <-a.denied: - a.stop(err) - break - } - } +func (a *auth) Stop() { + a.destroy(nil) +} + +func (a *auth) destroy(err error) { + if a.stopChan == nil { + logging.Logger().Panic("Auth: auth must be started before stopping it") + } + close(a.stopChan) + a.stopWg.Wait() + a.stopChan = nil + + a.cClient.Close() + close(a.tempProbeKeyChan) + close(a.acceptChan) + close(a.denyChan) + + logging.Logger().Info(fmt.Sprintf("Auth: stopped")) + a.doneChan <- err +} + +func (a *auth) handleAuth() { + var err error + defer func() { + a.stopWg.Done() + a.destroy(err) }() -} -func (a *auth) serveRegistration() error { - var err error - header := http.Header{} - - var enc string - if enc, err = getRegistHeader(); nil != err { - return err + if err = a.cClient.Connect(); nil != err { + return } - header[module.NoAuthProbeHeader_NoAuthRegist] = []string{enc} - - var res *http.Response - if res, err = a.centralClient.Dial(a.entryURL, header); nil != err { - return err - } - - tempKey := res.Header.Get(module.NoAuthProbeHeader_SetNoAuthID) - a.noAuthConfig.TempKey = &tempKey - if err = lfcc.Save(a.noAuthConfig, a.noAuthConfigPath, true); nil != err { - return err - } - - return nil -} - -func (a *auth) serveConnect(noAuthTempKey string) error { - var err error - header := http.Header{} - header[module.NoAuthProbeHeader_NoAuthID] = []string{noAuthTempKey} - - var res *http.Response - if res, err = a.centralClient.Dial(a.entryURL, header); nil != err { - return err - } - - logging.Logger.Debug(fmt.Sprintf("Auth: Connect HTTP Status[%s]", res.Status)) - - return nil -} - -func (a *auth) Shutdown(ctx context.Context) error { for { - a.stop(fmt.Errorf("Shutdown")) select { - case <-ctx.Done(): - return ctx.Err() + case tempProbeKey := <-a.tempProbeKeyChan: + err = a.handleTempProbeKey(tempProbeKey) + if nil != err { + return + } + case p := <-a.acceptChan: + err = a.handleAccept(p) + return + case nap := <-a.denyChan: + err = a.handleDeny(nap) + return + case <-a.stopChan: + return } } } -func (a *auth) stop(err error) { - defer close(a.shutdown) - a.shutdown <- true - close(a.accepted) - close(a.denied) - - ctx := context.Background() - if err := a.centralClient.Shutdown(ctx); nil != err { - logging.Logger.Error(fmt.Sprintf("Client of Central: %v", err)) +func (a *auth) handleTempProbeKey(tempProbeKey string) error { + a.config.TempKey = &tempProbeKey + if err := cc.Save(a.config, a.configPath, true); nil != err { + return err } - a.endded <- err + return nil +} + +func (a *auth) handleAccept(p *probeM.Probe) error { + config.Config.Probe.Key = &p.ProbeKey + + if err := cc.Save(*config.Config, *config.ConfigFilePath, true); nil != err { + return fmt.Errorf("Auth: Saving of config file[%s] failed error[%v]", *config.ConfigFilePath, err) + } + + return nil +} + +func (a *auth) handleDeny(nap *noauthprobeM.NoAuthProbe) error { + n := time.Now() + a.config.DenyDate = &n + if err := cc.Save(a.config, a.configPath, true); nil != err { + return fmt.Errorf("Auth: Saving of NoAuth config file[%s] failed error[%v]", a.configPath, err) + } + return nil } diff --git a/auth/client/client.go b/auth/client/client.go new file mode 100644 index 0000000..076a324 --- /dev/null +++ b/auth/client/client.go @@ -0,0 +1,11 @@ +package client + +import ( + cwfc "git.loafle.net/commons_go/websocket_fasthttp/client" + oogwc "git.loafle.net/overflow/overflow_gateway_websocket/client" +) + +func NewClient(ch oogwc.ClientHandler, sb cwfc.SocketBuilder) oogwc.Client { + + return oogwc.New(ch, sb) +} diff --git a/auth/client/client_handlers.go b/auth/client/client_handlers.go new file mode 100644 index 0000000..c8aa856 --- /dev/null +++ b/auth/client/client_handlers.go @@ -0,0 +1,32 @@ +package client + +import ( + "fmt" + + "git.loafle.net/commons_go/logging" + crc "git.loafle.net/commons_go/rpc/client" + crr "git.loafle.net/commons_go/rpc/registry" + oogwc "git.loafle.net/overflow/overflow_gateway_websocket/client" +) + +func NewClientHandler(rpcInvoker crr.RPCInvoker) oogwc.ClientHandler { + ch := &ClientHandlers{} + ch.ClientHandler = oogwc.NewClientHandler(rpcInvoker) + + return ch +} + +type ClientHandlers struct { + oogwc.ClientHandler +} + +func (ch *ClientHandlers) Init(clientCTX crc.ClientContext) error { + logging.Logger().Info(fmt.Sprintf("Auth: client has been initialized")) + + return nil +} + +func (ch *ClientHandlers) Destroy(clientCTX crc.ClientContext) { + logging.Logger().Info(fmt.Sprintf("Auth: client has been destroyed")) +} + diff --git a/auth/client/socket_builders.go b/auth/client/socket_builders.go new file mode 100644 index 0000000..0526b55 --- /dev/null +++ b/auth/client/socket_builders.go @@ -0,0 +1,61 @@ +package client + +import ( + "net/http" + + "git.loafle.net/commons_go/logging" + cwfc "git.loafle.net/commons_go/websocket_fasthttp/client" + ooccn "git.loafle.net/overflow/overflow_commons_go/config/noauthprobe" + oocmn "git.loafle.net/overflow/overflow_commons_go/modules/noauthprobe" + oopcc "git.loafle.net/overflow/overflow_probes/central/client" +) + +func NewSocketBuilder(config *ooccn.NoAuthProbeConfig, tempProbeKeyChan chan string, registerRequestHeader string) cwfc.SocketBuilder { + sb := &SocketBuilders{ + config: config, + tempProbeKeyChan: tempProbeKeyChan, + registerRequestHeader: registerRequestHeader, + } + sb.SocketBuilders = oopcc.NewSocketBuilder(oocmn.HTTPEntry_NoAuthProbe) + if nil == sb.SocketBuilders { + return nil + } + + return sb +} + +type SocketBuilders struct { + *oopcc.SocketBuilders + + config *ooccn.NoAuthProbeConfig + tempProbeKeyChan chan string + registerRequestHeader string +} + +func (sb *SocketBuilders) SocketHandler() cwfc.SocketHandler { + return newSocketHandler(sb.config, sb.tempProbeKeyChan) +} + +func (sb *SocketBuilders) GetRequestHeader() http.Header { + reqHeader := http.Header{} + + switch sb.config.State() { + case ooccn.NoAuthProbeStateTypeNotRegisterd: + reqHeader[oocmn.HTTPRequestHeaderKey_NoAuthProbe_Method] = []string{oocmn.HTTPRequestHeaderValue_NoAuthProbe_Method_Regist} + reqHeader[oocmn.HTTPRequestHeaderKey_NoAuthProbe_Info] = []string{sb.registerRequestHeader} + + case ooccn.NoAuthProbeStateTypeRegisterd: + reqHeader[oocmn.HTTPRequestHeaderKey_NoAuthProbe_Method] = []string{oocmn.HTTPRequestHeaderValue_NoAuthProbe_Method_Connect} + reqHeader[oocmn.HTTPRequestHeaderKey_NoAuthProbe_TempProbeKey] = []string{oocmn.HTTPRequestHeaderValue_NoAuthProbe_Method_Connect} + } + + return reqHeader +} + +func (sb *SocketBuilders) Validate() { + sb.SocketBuilders.Validate() + + if nil == sb.config { + logging.Logger().Panic("Auth: NoAuthProbeConfig must be specified") + } +} diff --git a/auth/client/socket_handlers.go b/auth/client/socket_handlers.go new file mode 100644 index 0000000..cc6d077 --- /dev/null +++ b/auth/client/socket_handlers.go @@ -0,0 +1,42 @@ +package client + +import ( + "fmt" + "net/http" + + "git.loafle.net/commons_go/logging" + cwfc "git.loafle.net/commons_go/websocket_fasthttp/client" + ooccn "git.loafle.net/overflow/overflow_commons_go/config/noauthprobe" + oocmn "git.loafle.net/overflow/overflow_commons_go/modules/noauthprobe" +) + +type SocketHandlers struct { + cwfc.SocketHandlers + + config *ooccn.NoAuthProbeConfig + tempProbeKeyChan chan string +} + +func (sh *SocketHandlers) OnConnect(socketContext cwfc.SocketContext, res *http.Response) { + logging.Logger().Info(fmt.Sprintf("Auth: client has been connected res[%v]", res)) + + switch sh.config.State() { + case ooccn.NoAuthProbeStateTypeNotRegisterd: + tempProbeKey := res.Header.Get(oocmn.HTTPResponseHeaderKey_NoAuthProbe_SetTempProbeKey) + sh.tempProbeKeyChan <- tempProbeKey + case ooccn.NoAuthProbeStateTypeRegisterd: + } + +} + +func (sh *SocketHandlers) OnDisconnect(soc cwfc.Socket) { + logging.Logger().Info(fmt.Sprintf("Auth: client has been disconnected soc[%v]", soc)) + +} + +func newSocketHandler(config *ooccn.NoAuthProbeConfig, tempProbeKeyChan chan string) cwfc.SocketHandler { + return &SocketHandlers{ + config: config, + tempProbeKeyChan: tempProbeKeyChan, + } +} diff --git a/auth/registration.go b/auth/info/info.go similarity index 57% rename from auth/registration.go rename to auth/info/info.go index 0cae0f2..ee5a449 100644 --- a/auth/registration.go +++ b/auth/info/info.go @@ -1,4 +1,4 @@ -package auth +package info import ( "bytes" @@ -8,23 +8,24 @@ import ( "net" "git.loafle.net/commons_go/util/net/gateway" - "git.loafle.net/overflow/overflow_probes/central/api/module" + noauthprobeM "git.loafle.net/overflow/overflow_commons_go/modules/noauthprobe/model" "git.loafle.net/overflow/overflow_probes/config" "github.com/shirou/gopsutil/host" ) -func getRegistHeader() (string, error) { +func GetRegistHeader() (string, error) { var err error - nap := module.NoAuthProbe{ - APIKey: config.CFG.Central.APIKey, + nap := noauthprobeM.NoAuthProbe{ + APIKey: config.Config.Central.APIKey, } - var nad *module.NoAuthProbeDescription - if nad, err = getDescription(); nil != err { + + var napd *noauthprobeM.NoAuthProbeDescription + if napd, err = getDescription(); nil != err { return "", err } var buf []byte - if buf, err = json.Marshal(nad); nil != err { + if buf, err = json.Marshal(napd); nil != err { return "", err } nap.Description = string(buf) @@ -38,46 +39,49 @@ func getRegistHeader() (string, error) { return enc, nil } -func getDescription() (*module.NoAuthProbeDescription, error) { - nad := &module.NoAuthProbeDescription{} +func getDescription() (*noauthprobeM.NoAuthProbeDescription, error) { + var err error + napd := &noauthprobeM.NoAuthProbeDescription{} - if err := getHost(&nad.Host); nil != err { + if napd.Host, err = getHost(); nil != err { return nil, err } - if err := getNetwork(&nad.Network); nil != err { + if napd.Network, err = getNetwork(); nil != err { return nil, err } - return nad, nil + return napd, nil } -func getHost(h *module.NoAuthProbeDescriptionHost) error { +func getHost() (*noauthprobeM.NoAuthProbeDescriptionHost, error) { if i, err := host.Info(); nil == err { + h := &noauthprobeM.NoAuthProbeDescriptionHost{} + h.Name = i.Hostname h.OS = i.OS h.Platform = i.Platform h.PlatformFamily = i.PlatformFamily h.KernelVersion = i.KernelVersion h.HostID = i.HostID - } else { - return err - } - return nil + return h, nil + } else { + return nil, err + } } -func getNetwork(n *module.NoAuthProbeDescriptionNetwork) error { +func getNetwork() (*noauthprobeM.NoAuthProbeDescriptionNetwork, error) { var ip net.IP var iface string var err error if ip, iface, err = gateway.DiscoverGateway(); nil != err { - return err + return nil, err } interfaces, err := net.Interfaces() if err != nil { - return err + return nil, err } idx := -1 @@ -90,9 +94,11 @@ func getNetwork(n *module.NoAuthProbeDescriptionNetwork) error { } if -1 == idx { - return errors.New("Interface of gateway is not exist") + return nil, errors.New("Interface of gateway is not exist") } + n := &noauthprobeM.NoAuthProbeDescriptionNetwork{} + i := interfaces[idx] n.Name = i.Name @@ -109,8 +115,8 @@ func getNetwork(n *module.NoAuthProbeDescriptionNetwork) error { } n.Address = buffer.String() } else { - return err + return nil, err } - return nil + return n, nil } diff --git a/auth/on_notify.go b/auth/on_notify.go deleted file mode 100644 index 87aaa32..0000000 --- a/auth/on_notify.go +++ /dev/null @@ -1,55 +0,0 @@ -package auth - -import ( - "fmt" - "time" - - lfcc "git.loafle.net/commons_go/config" - "git.loafle.net/commons_go/logging" - "git.loafle.net/overflow/overflow_probes/central/api/module" - "git.loafle.net/overflow/overflow_probes/config" -) - -func (a *auth) onNotify(method string, params []string) { - switch method { - case module.NoAuthProbeService_AcceptNoAuthProbe: - a.onNoAuthProbeAccept(params) - break - case module.NoAuthProbeService_DenyNoauthProbe: - a.onNoAuthProbeDeny(params) - break - } - -} - -func (a *auth) onNoAuthProbeAccept(params []string) { - var err error - probeKey := params[0] - - // if lfcc.Exists(a.probeConfigPath) { - // if err = lfcc.Load(&a.probeConfig, a.probeConfigPath); nil != err { - // logging.Logger.Error(fmt.Sprintf("Auth: Loading of Probe config file[%s] failed error[%v]", a.probeConfigPath, err)) - // } - // } - - config.CFG.Probe.Key = &probeKey - - if err = lfcc.Save(*config.CFG, *config.ConfigFilePath, true); nil != err { - logging.Logger.Error(fmt.Sprintf("Auth: Saving of config file[%s] failed error[%v]", *config.ConfigFilePath, err)) - return - } - - a.accepted <- true -} - -func (a *auth) onNoAuthProbeDeny(params []string) { - n := time.Now() - a.noAuthConfig.DenyDate = &n - if err := lfcc.Save(a.noAuthConfig, a.noAuthConfigPath, true); nil != err { - logging.Logger.Error(fmt.Sprintf("Auth: Saving of NoAuth config file[%s] failed error[%v]", a.noAuthConfigPath, err)) - - return - } - - a.denied <- fmt.Errorf("This probe have been denied from overFlow") -} diff --git a/auth/rpc.go b/auth/rpc.go new file mode 100644 index 0000000..7eaf9ba --- /dev/null +++ b/auth/rpc.go @@ -0,0 +1,18 @@ +package auth + +import ( + crr "git.loafle.net/commons_go/rpc/registry" + oopar "git.loafle.net/overflow/overflow_probes/auth/rpc" +) + +func initRPCRegistry(a *auth) crr.RPCInvoker { + rpcRegistry := crr.NewRPCRegistry() + + napService := &oopar.NoAuthProbeService{ + AcceptChan: a.acceptChan, + DenyChan: a.denyChan, + } + rpcRegistry.RegisterService(napService, "") + + return rpcRegistry +} diff --git a/auth/rpc/NoAuthProbeService.go b/auth/rpc/NoAuthProbeService.go new file mode 100644 index 0000000..684aa47 --- /dev/null +++ b/auth/rpc/NoAuthProbeService.go @@ -0,0 +1,20 @@ +package rpc + +import ( + noauthprobeM "git.loafle.net/overflow/overflow_commons_go/modules/noauthprobe/model" + probeM "git.loafle.net/overflow/overflow_commons_go/modules/probe/model" +) + +type NoAuthProbeService struct { + AcceptChan chan *probeM.Probe + DenyChan chan *noauthprobeM.NoAuthProbe +} + +func (s *NoAuthProbeService) Accept(probe *probeM.Probe) { + s.AcceptChan <- probe +} + +func (s *NoAuthProbeService) Deny(noAuthProbe *noauthprobeM.NoAuthProbe) { + s.DenyChan <- noAuthProbe +} + diff --git a/auth/rpc/rpc.go b/auth/rpc/rpc.go new file mode 100644 index 0000000..9ab1e3e --- /dev/null +++ b/auth/rpc/rpc.go @@ -0,0 +1 @@ +package rpc diff --git a/central/api/module/crawler.go b/central/api/module/crawler.go deleted file mode 100644 index 7f63874..0000000 --- a/central/api/module/crawler.go +++ /dev/null @@ -1,7 +0,0 @@ -package module - -const ( - CrawlerService_Install = "CrawlerService.install" - CrawlerService_Uninstall = "CrawlerService.uninstall" - CrawlerService_Update = "CrawlerService.update" -) diff --git a/central/api/module/discovery.go b/central/api/module/discovery.go deleted file mode 100644 index 8ceb969..0000000 --- a/central/api/module/discovery.go +++ /dev/null @@ -1,6 +0,0 @@ -package module - -const ( - DiscoveryService_Start = "DiscoveryService.start" - DiscoveryService_Stop = "DiscoveryService.stop" -) diff --git a/central/api/module/log.go b/central/api/module/log.go deleted file mode 100644 index 7d5780c..0000000 --- a/central/api/module/log.go +++ /dev/null @@ -1,5 +0,0 @@ -package module - -const ( - LogService_Send = "LogService.send" -) diff --git a/central/api/module/noauthprobe.go b/central/api/module/noauthprobe.go deleted file mode 100644 index 0e868d0..0000000 --- a/central/api/module/noauthprobe.go +++ /dev/null @@ -1,45 +0,0 @@ -package module - -import "time" - -const ( - NoAuthProbeHeader_NoAuthID = "overFlow-NoAuth-ID" - NoAuthProbeHeader_NoAuthRegist = "overFlow-NoAuth-Regist" - NoAuthProbeHeader_SetNoAuthID = "overFlow-Set-NoAuth-ID" -) - -type NoAuthProbe struct { - ID uint64 `json:"id"` - Description string `json:"description"` - TempProbeKey string `json:"tempProbeKey"` - CreateDate time.Time `json:"createDate"` - APIKey string `json:"apiKey"` -} - -const ( - NoAuthProbeService_Regist = "NoAuthProbeService.regist" - NoAuthProbeService_AcceptNoAuthProbe = "NoAuthProbeService.acceptNoAuthProbe" - NoAuthProbeService_DenyNoauthProbe = "NoAuthProbeService.denyNoauthProbe" -) - -type NoAuthProbeDescription struct { - Host NoAuthProbeDescriptionHost `json:"host"` - Network NoAuthProbeDescriptionNetwork `json:"network"` -} - -type NoAuthProbeDescriptionHost struct { - Name string `json:"name"` - OS string `json:"os"` - Platform string `json:"paltform"` - PlatformFamily string `json:"platformFamily"` - PlatformVersion string `json:"platformVersion"` - KernelVersion string `json:"kernelVersion"` - HostID string `json:"hostID"` -} - -type NoAuthProbeDescriptionNetwork struct { - Name string `json:"name"` - Address string `json:"address"` - Gateway string `json:"gateway"` - MacAddress string `json:"macAddress"` -} diff --git a/central/api/module/probe.go b/central/api/module/probe.go deleted file mode 100644 index b1c2bf2..0000000 --- a/central/api/module/probe.go +++ /dev/null @@ -1,22 +0,0 @@ -package module - -import "time" - -const ( - ProbeHeader_ProbeKey = "overFlow-Probe-Key" - ProbeHeader_Probe_EncryptionKey = "overFlow-Probe-EncryptionKey" -) - -const ( - ProbeService_Started = "ProbeService.started" - ProbeService_Stopped = "ProbeService.stopped" - ProbeService_Update = "ProbeService.update" -) - -type Probe struct { - ID uint64 `json:"id"` - Description string `json:"description"` - TempProbeKey string `json:"tempProbeKey"` - CreateDate time.Time `json:"createDate"` - ProbeKey string `json:"probeKey"` -} diff --git a/central/api/module/sensor.go b/central/api/module/sensor.go deleted file mode 100644 index d22c8c0..0000000 --- a/central/api/module/sensor.go +++ /dev/null @@ -1,9 +0,0 @@ -package module - -const ( - SensorService_Start = "SensorService.start" - SensorService_Stop = "SensorService.stop" - SensorService_Add = "SensorService.add" - SensorService_Remove = "SensorService.remove" - SensorService_Update = "SensorService.update" -) diff --git a/central/client/client.go b/central/client/client.go index af527b1..da13c8e 100644 --- a/central/client/client.go +++ b/central/client/client.go @@ -1,314 +1 @@ package client - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "io" - "net/http" - "sync" - - "git.loafle.net/commons_go/logging" - "git.loafle.net/overflow/overflow_probes/central/client/protocol" - "git.loafle.net/overflow/overflow_probes/config" - "github.com/gorilla/websocket" -) - -const ( - ProtocolName = "RPC/1.0" -) - -type ( - OnNotifyFunc func(method string, params []string) - OnCloseFunc func(code int, text string) -) - -type ServerError string - -func (e ServerError) Error() string { - return string(e) -} - -var ErrShutdown = errors.New("connection is shut down") - -type Call struct { - Method string // The name of the service and method to call. - Args []string // The argument to the function (*struct). - Result interface{} // The reply from the function (*struct). - Error error // After completion, the error status. - Done chan *Call // Strobes when call is complete. -} - -func (c *Call) done() { - select { - case c.Done <- c: - // ok - default: - // We don't want to block here. It is the caller's responsibility to make - // sure the channel has enough buffer space. See comment in Go(). - logging.Logger.Debug("Client: discarding Call reply due to insufficient Done chan capacity") - } -} - -type Client interface { - Dial(url string, header http.Header) (*http.Response, error) - Call(method string, args []string, result interface{}) error - Notify(method string, args []string) error - OnNotify(cb OnNotifyFunc) - OnClose(cb OnCloseFunc) - Shutdown(ctx context.Context) error -} - -type client struct { - conn *websocket.Conn - sendMutex sync.Mutex - request protocol.Request - notification protocol.Notification - mutex sync.Mutex - requestID uint64 - pending map[uint64]*Call - closing bool // user has called Close - shutdown bool // server has told us to stop - onNotifyHandler OnNotifyFunc - onCloseHandlers []OnCloseFunc -} - -func New() Client { - c := &client{ - requestID: 0, - pending: make(map[uint64]*Call), - onCloseHandlers: make([]OnCloseFunc, 1), - } - - return c -} - -func (c *client) Dial(url string, header http.Header) (*http.Response, error) { - var err error - var res *http.Response - - dialer := websocket.Dialer{ - ReadBufferSize: config.CFG.Central.ReadBufferSize, - WriteBufferSize: config.CFG.Central.WriteBufferSize, - } - - if c.conn, res, err = dialer.Dial(url, header); nil != err { - return nil, err - } - - c.conn.SetCloseHandler(c.connCloseHandler) - - go c.input() - - return res, nil -} - -func (c *client) Call(method string, args []string, result interface{}) error { - call := <-c.goCall(method, args, result, make(chan *Call, 1)).Done - return call.Error -} - -func (c *client) Notify(method string, args []string) error { - c.sendMutex.Lock() - defer c.sendMutex.Unlock() - - c.notification.Protocol = ProtocolName - c.notification.Method = method - c.notification.Params = args - - if err := c.conn.WriteJSON(c.notification); nil != err { - return err - } - return nil -} - -func (c *client) OnNotify(cb OnNotifyFunc) { - c.onNotifyHandler = cb -} - -func (c *client) OnClose(cb OnCloseFunc) { - c.onCloseHandlers = append(c.onCloseHandlers, cb) -} - -func (c *client) Shutdown(ctx context.Context) error { - c.mutex.Lock() - if c.closing { - c.mutex.Unlock() - return ErrShutdown - } - c.closing = true - c.mutex.Unlock() - - return c.conn.Close() -} - -// Go invokes the function asynchronously. It returns the Call structure representing -// the invocation. The done channel will signal when the call is complete by returning -// the same Call object. If done is nil, Go will allocate a new channel. -// If non-nil, done must be buffered or Go will deliberately crash. -func (c *client) goCall(method string, args []string, result interface{}, done chan *Call) *Call { - call := new(Call) - call.Method = method - call.Args = args - call.Result = result - if done == nil { - done = make(chan *Call, 10) // buffered. - } else { - // If caller passes done != nil, it must arrange that - // done has enough buffer for the number of simultaneous - // RPCs that will be using that channel. If the channel - // is totally unbuffered, it's best not to run at all. - if cap(done) == 0 { - logging.Logger.Panic("Client: done channel is unbuffered") - } - } - call.Done = done - c.sendCall(call) - return call -} - -func (c *client) sendCall(call *Call) { - c.sendMutex.Lock() - defer c.sendMutex.Unlock() - - // Register this call. - c.mutex.Lock() - if c.shutdown || c.closing { - call.Error = ErrShutdown - c.mutex.Unlock() - call.done() - return - } - c.requestID++ - id := c.requestID - c.pending[id] = call - c.mutex.Unlock() - - // Encode and send the request. - c.request.Protocol = ProtocolName - c.request.Method = call.Method - c.request.Params = call.Args - c.request.ID = id - if err := c.conn.WriteJSON(c.request); nil != err { - c.mutex.Lock() - call = c.pending[id] - delete(c.pending, id) - c.mutex.Unlock() - if call != nil { - call.Error = err - call.done() - } - } -} - -func (c *client) input() { - var err error - var res protocol.Response - var noti protocol.Notification - var messageType int - var buff []byte - - for err == nil { - if messageType, buff, err = c.conn.ReadMessage(); nil != err { - logging.Logger.Error(fmt.Sprintf("Client: Reader error[%v]", err)) - continue - } - logging.Logger.Debug(fmt.Sprintf("Client: messageType:%d", messageType)) - - if err = json.Unmarshal(buff, ¬i); nil != err { - if err = json.Unmarshal(buff, &res); nil != err { - logging.Logger.Error(fmt.Sprintf("Client: Decode error[%v]", err)) - continue - } else { - if err = c.onResponse(res); nil != err { - logging.Logger.Error(fmt.Sprintf("Client: Response error[%v]", err)) - continue - } - } - } else { - c.onNotification(noti) - } - - // if err = json.Unmarshal(buff, &res); nil != err { - // if err = json.Unmarshal(buff, ¬i); nil != err { - // logging.Logger.Error(fmt.Sprintf("Client: Decode error[%v]", err)) - // continue - // } else { - // c.onNotification(noti) - // } - // } else { - // err = c.onResponse(res) - // } - } - // Terminate pending calls. - c.sendMutex.Lock() - c.mutex.Lock() - c.shutdown = true - closing := c.closing - if err == io.EOF { - if closing { - err = ErrShutdown - } else { - err = io.ErrUnexpectedEOF - } - } - for _, call := range c.pending { - call.Error = err - call.done() - } - c.mutex.Unlock() - c.sendMutex.Unlock() - - if err != io.EOF && !closing { - logging.Logger.Debug(fmt.Sprintf("Client: client protocol error:%v", err)) - } -} - -func (c *client) onResponse(res protocol.Response) error { - var err error - id := res.ID - c.mutex.Lock() - call := c.pending[id] - delete(c.pending, id) - c.mutex.Unlock() - - switch { - case call == nil: - - case res.Error != nil: - // We've got an error response. Give this to the request; - // any subsequent requests will get the ReadResponseBody - // error if there is one. - if protocol.ProtocolErrorCodeInternal == res.Error.Code { - if nil != res.Error.Message { - call.Error = ServerError(*res.Error.Message) - } - } - - call.done() - default: - if err = json.Unmarshal(*res.Result, call.Result); nil != err { - call.Error = errors.New("reading body " + err.Error()) - } - call.done() - } - - return err -} - -func (c *client) onNotification(noti protocol.Notification) { - if nil == c.onNotifyHandler { - return - } - - c.onNotifyHandler(noti.Method, noti.Params) -} - -func (c *client) connCloseHandler(code int, text string) error { - for _, h := range c.onCloseHandlers { - h(code, text) - } - - return nil -} diff --git a/central/client/probe.go b/central/client/probe.go deleted file mode 100644 index e501f23..0000000 --- a/central/client/probe.go +++ /dev/null @@ -1,21 +0,0 @@ -package client - -import ( - "net/http" - - "git.loafle.net/overflow/overflow_probes/central/api/module" - "git.loafle.net/overflow/overflow_probes/config" -) - -func ConnectToCentralAsProbe(c Client, entryURL string) (*http.Response, error) { - header := http.Header{} - header[module.ProbeHeader_ProbeKey] = []string{*config.CFG.Probe.Key} - - var res *http.Response - var err error - if res, err = c.Dial(entryURL, header); nil != err { - return nil, err - } - - return res, nil -} diff --git a/central/client/protocol/header.go b/central/client/protocol/header.go deleted file mode 100644 index 526023e..0000000 --- a/central/client/protocol/header.go +++ /dev/null @@ -1,5 +0,0 @@ -package protocol - -type Header struct { - Protocol string `json:"protocol"` -} diff --git a/central/client/protocol/notification.go b/central/client/protocol/notification.go deleted file mode 100644 index c369e41..0000000 --- a/central/client/protocol/notification.go +++ /dev/null @@ -1,7 +0,0 @@ -package protocol - -type Notification struct { - Header - Method string `json:"method"` - Params []string `json:"params,omitempty"` -} diff --git a/central/client/protocol/protocol_error.go b/central/client/protocol/protocol_error.go deleted file mode 100644 index b199899..0000000 --- a/central/client/protocol/protocol_error.go +++ /dev/null @@ -1,19 +0,0 @@ -package protocol - -type ProtocolErrorCode int - -const ( - ProtocolErrorCodeParse ProtocolErrorCode = -32700 - ProtocolErrorCodeInvalidRequest ProtocolErrorCode = -32600 - ProtocolErrorCodeNotFoundMethod ProtocolErrorCode = -32601 - ProtocolErrorCodeInvalidParams ProtocolErrorCode = -32602 - ProtocolErrorCodeInternal ProtocolErrorCode = -32603 - // -32000 ~ -32099 - ProtocolErrorCodeServer ProtocolErrorCode = -32000 -) - -type ProtocolError struct { - Code ProtocolErrorCode `json:"code"` - Message *string `json:"message"` - Data interface{} `json:"data"` -} diff --git a/central/client/protocol/request.go b/central/client/protocol/request.go deleted file mode 100644 index 866058a..0000000 --- a/central/client/protocol/request.go +++ /dev/null @@ -1,6 +0,0 @@ -package protocol - -type Request struct { - Notification - ID uint64 `json:"id,omitempty"` -} diff --git a/central/client/protocol/response.go b/central/client/protocol/response.go deleted file mode 100644 index 8e681c7..0000000 --- a/central/client/protocol/response.go +++ /dev/null @@ -1,10 +0,0 @@ -package protocol - -import "encoding/json" - -type Response struct { - Header - ID uint64 `json:"id"` - Result *json.RawMessage `json:"result,omitempty"` - Error *ProtocolError `json:"error,omitempty"` -} diff --git a/central/client/socket_builders.go b/central/client/socket_builders.go new file mode 100644 index 0000000..5120f8c --- /dev/null +++ b/central/client/socket_builders.go @@ -0,0 +1,28 @@ +package client + +import ( + "fmt" + + "git.loafle.net/commons_go/logging" + cunu "git.loafle.net/commons_go/util/net/url" + cwfc "git.loafle.net/commons_go/websocket_fasthttp/client" + + "git.loafle.net/overflow/overflow_probes/config" +) + +func NewSocketBuilder(entryPath string) *SocketBuilders { + sb := &SocketBuilders{} + url, err := cunu.Join(config.Config.Central.URL, entryPath) + if nil != err { + logging.Logger().Error(fmt.Sprintf("Auth: Cannot create SocketBuilder %v", err)) + return nil + } + sb.URL = url + sb.TLSConfig = nil + + return sb +} + +type SocketBuilders struct { + cwfc.SocketBuilders +} diff --git a/config/config.go b/config/config.go index e536846..665aac8 100644 --- a/config/config.go +++ b/config/config.go @@ -1,28 +1,13 @@ package config -const ( - ConfigFileName = "config.json" +import ( + ooccp "git.loafle.net/overflow/overflow_commons_go/config/probe" ) var ( ConfigDir *string ConfigFilePath *string - CFG *Config EncryptionKey *string + + Config *ooccp.Config ) - -type Config struct { - Central CentralConfig `json:"central" yaml:"central" toml:"central"` - Probe ProbeConfig `json:"probe" yaml:"probe" toml:"probe"` -} - -type CentralConfig struct { - URL string `required:"true" json:"url" yaml:"url" toml:"url"` - APIKey string `required:"true" json:"apiKey" yaml:"apiKey" toml:"apiKey"` - ReadBufferSize int `default:"8192" json:"readBufferSize" yaml:"readBufferSize" toml:"readBufferSize"` - WriteBufferSize int `default:"8192" json:"writeBufferSize" yaml:"writeBufferSize" toml:"writeBufferSize"` -} - -type ProbeConfig struct { - Key *string `json:"key,omitempty" yaml:"key" toml:"key"` -} diff --git a/config/noauth.go b/config/noauth.go deleted file mode 100644 index 5754888..0000000 --- a/config/noauth.go +++ /dev/null @@ -1,12 +0,0 @@ -package config - -import "time" - -const ( - NoAuthProbeConfigFileName = "noauthprobe.json" -) - -type NoAuthProbeConfig struct { - TempKey *string `json:"tempKey,omitempty" yaml:"tempKey" toml:"tempKey"` - DenyDate *time.Time `json:"denyDate,omitempty" yaml:"denyDate" toml:"denyDate"` -} diff --git a/constants.go b/constants.go new file mode 100644 index 0000000..31d3531 --- /dev/null +++ b/constants.go @@ -0,0 +1,6 @@ +package main + +const ( + ConfigPathFlagName = "config-dir" + ConfigFileName = "config.json" +) diff --git a/glide.yaml b/glide.yaml index 61be519..392c4a3 100644 --- a/glide.yaml +++ b/glide.yaml @@ -9,3 +9,5 @@ import: version: v2.17.08 - package: github.com/takama/daemon version: 0.9.1 +- package: github.com/dgrijalva/jwt-go + version: v3.1.0 diff --git a/main.go b/main.go index 0ff9d24..b0e8be2 100644 --- a/main.go +++ b/main.go @@ -10,13 +10,13 @@ import ( "syscall" "time" - "git.loafle.net/overflow/overflow_probes/commons" - lfcc "git.loafle.net/commons_go/config" "git.loafle.net/commons_go/logging" + oocc "git.loafle.net/overflow/overflow_commons_go/config" + ooccp "git.loafle.net/overflow/overflow_commons_go/config/probe" "git.loafle.net/overflow/overflow_probes/auth" + "git.loafle.net/overflow/overflow_probes/commons" "git.loafle.net/overflow/overflow_probes/config" - "git.loafle.net/overflow/overflow_probes/probe" ) /* @@ -38,8 +38,7 @@ import ( // ) var ( - daemonCommand *string - configDir *string + configDir *string ) func init() { @@ -49,59 +48,45 @@ func init() { // flag.PrintDefaults() // } - configDir = flag.String("config-dir", ".", "The directory of config") + configDir = oocc.FlagConfigDir() flag.Parse() } func main() { - var err error - var instance interface{} - - defer logging.Logger.Sync() + defer logging.Logger().Sync() printBanner() + loadConfig() - if dir, err := lfcc.ABSPathify(*configDir); nil != err { - logging.Logger.Panic(fmt.Sprintf("Config path[%s] is not valid", *configDir)) - } else { - logging.Logger.Debug(fmt.Sprintf("Config path: %s", dir)) - config.ConfigDir = &dir - } - cfp := path.Join(*config.ConfigDir, config.ConfigFileName) - config.ConfigFilePath = &cfp - - conf := lfcc.New() - config.CFG = &config.Config{} - if err := conf.Load(config.CFG, *config.ConfigFilePath); nil != err { - logging.Logger.Panic(fmt.Sprintf("Config is not valid: %v", err)) - } + var instance interface{} go func() { - if nil == config.CFG.Probe.Key || "" == *config.CFG.Probe.Key { - if instance, err = auth.New(); nil != err { - logging.Logger.Error(fmt.Sprintf("Auth error: %v", err)) + if ooccp.ProbeStateTypeNotAuthorized == config.Config.Probe.State() { + var err error + instance = auth.New() + + authDoneChan := make(chan error, 1) + defer close(authDoneChan) + + if err := instance.(commons.EndableStarter).EndableStart(authDoneChan); err != nil { + logging.Logger().Error(err.Error()) return } - endded := make(chan error) - defer close(endded) - if err := instance.(commons.EndableStarter).EndableStart(endded); err != nil { - logging.Logger.Error(fmt.Sprintf("Auther error: %v", err)) - return - } - if err := <-endded; nil != err { - logging.Logger.Error(fmt.Sprintf("Auther error: %v", err)) + err = <-authDoneChan + if nil != err { + logging.Logger().Error(err.Error()) return } } - if instance, err = probe.New(); nil != err { - logging.Logger.Error(fmt.Sprintf("Probe error: %v", err)) - return - } - if err := instance.(commons.Starter).Start(); err != nil { - logging.Logger.Error(fmt.Sprintf("Probe error: %v", err)) - return - } + // if instance, err = probe.New(); nil != err { + // logging.Logger().Error(fmt.Sprintf("Probe error: %v", err)) + // return + // } + // if err := instance.(commons.Starter).Start(); err != nil { + // logging.Logger().Error(fmt.Sprintf("Probe error: %v", err)) + // return + // } }() @@ -121,7 +106,25 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := instance.(commons.Shutdowner).Shutdown(ctx); err != nil { - logging.Logger.Error(fmt.Sprintf("Probe error: %v", err)) + logging.Logger().Error(fmt.Sprintf("Probe error: %v", err)) + } + +} + +func loadConfig() { + if dir, err := lfcc.ABSPathify(*configDir); nil != err { + logging.Logger().Panic(fmt.Sprintf("Config path[%s] is not valid", *configDir)) + } else { + logging.Logger().Debug(fmt.Sprintf("Config path: %s", dir)) + config.ConfigDir = &dir + } + cfp := path.Join(*config.ConfigDir, ooccp.ConfigFileName) + config.ConfigFilePath = &cfp + + conf := lfcc.New() + config.Config = &ooccp.Config{} + if err := conf.Load(config.Config, *config.ConfigFilePath); nil != err { + logging.Logger().Panic(fmt.Sprintf("Config is not valid: %v", err)) } } @@ -136,8 +139,8 @@ const ( ██║ ██║██║ ██║█████╗ ██████╔╝█████╗ ██║ ██║ ██║██║ █╗ ██║ ██║ ██║╚██╗ ██╔╝██╔══╝ ██╔══██╗██╔══╝ ██║ ██║ ██║██║███╗██║ ╚██████╔╝ ╚████╔╝ ███████╗██║ ██║██║ ███████╗╚██████╔╝╚███╔███╔╝ - ╚═════╝ ╚═══╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚══════╝ ╚═════╝ ╚══╝╚══╝ - + ╚═════╝ ╚═══╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚══════╝ ╚═════╝ ╚══╝╚══╝ + ` ) diff --git a/probe/on_notify.go b/probe/on_notify.go index 5842523..107971d 100644 --- a/probe/on_notify.go +++ b/probe/on_notify.go @@ -1,55 +1,55 @@ package probe -import ( - "fmt" +// import ( +// "fmt" - "git.loafle.net/commons_go/logging" - "git.loafle.net/overflow/overflow_probes/central/api/module" -) +// "git.loafle.net/commons_go/logging" +// "git.loafle.net/overflow/overflow_probes/central/api/module" +// ) -func (p *probe) onNotify(method string, params []string) { - var err error - switch method { - case module.CrawlerService_Install: +// func (p *probe) onNotify(method string, params []string) { +// var err error +// switch method { +// case module.CrawlerService_Install: - break - case module.CrawlerService_Uninstall: +// break +// case module.CrawlerService_Uninstall: - break - case module.CrawlerService_Update: +// break +// case module.CrawlerService_Update: - break - case module.SensorService_Start: +// break +// case module.SensorService_Start: - break - case module.SensorService_Stop: +// break +// case module.SensorService_Stop: - break - case module.SensorService_Add: +// break +// case module.SensorService_Add: - break - case module.SensorService_Remove: +// break +// case module.SensorService_Remove: - break - case module.SensorService_Update: +// break +// case module.SensorService_Update: - break - case module.ProbeService_Update: +// break +// case module.ProbeService_Update: - break - case module.LogService_Send: +// break +// case module.LogService_Send: - break - case module.DiscoveryService_Start: +// break +// case module.DiscoveryService_Start: - break - case module.DiscoveryService_Stop: +// break +// case module.DiscoveryService_Stop: - break - } +// break +// } - if nil != err { - logging.Logger.Error(fmt.Sprintf("Probe notify error: %v", err)) - } +// if nil != err { +// logging.Logger().Error(fmt.Sprintf("Probe notify error: %v", err)) +// } -} +// } diff --git a/probe/probe.go b/probe/probe.go index c295ea9..0a74a6a 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -1,132 +1,128 @@ package probe -import ( - "context" - "fmt" - "net/http" +// import ( +// "context" +// "fmt" +// "net/http" - "git.loafle.net/commons_go/logging" - "git.loafle.net/overflow/overflow_probes/central/api/module" - "git.loafle.net/overflow/overflow_probes/central/client" - "git.loafle.net/overflow/overflow_probes/commons" - "git.loafle.net/overflow/overflow_probes/config" - opuu "git.loafle.net/overflow/overflow_probes/util/url" -) +// "git.loafle.net/commons_go/logging" +// "git.loafle.net/overflow/overflow_probes/central/api/module" +// "git.loafle.net/overflow/overflow_probes/central/client" +// "git.loafle.net/overflow/overflow_probes/commons" +// "git.loafle.net/overflow/overflow_probes/config" +// opuu "git.loafle.net/overflow/overflow_probes/util/url" +// ) -const ( - probeEntryPoint = "/probe" - fileEntryPoint = "/file" - metricEntryPoint = "/metric" -) +// const ( +// probeEntryPoint = "/probe" +// fileEntryPoint = "/file" +// metricEntryPoint = "/metric" +// ) -type Prober interface { - commons.Starter - commons.Shutdowner -} +// type Prober interface { +// commons.Starter +// commons.Shutdowner +// } -type probe struct { - probeEntryURL string - fileEntryURL string - metricEntryURL string +// type probe struct { +// probeEntryURL string +// fileEntryURL string +// metricEntryURL string - probeClient client.Client - fileClient client.Client - metricClient client.Client +// probeClient client.Client +// fileClient client.Client +// metricClient client.Client - shutdown chan bool -} +// shutdown chan bool +// } -func New() (Prober, error) { - p := &probe{ - shutdown: make(chan bool), - } +// func New() (Prober, error) { +// p := &probe{ +// shutdown: make(chan bool), +// } - var err error +// var err error - if p.probeEntryURL, err = opuu.Join(config.CFG.Central.URL, probeEntryPoint); nil != err { - return nil, err - } - if p.fileEntryURL, err = opuu.Join(config.CFG.Central.URL, fileEntryPoint); nil != err { - return nil, err - } - if p.metricEntryURL, err = opuu.Join(config.CFG.Central.URL, metricEntryPoint); nil != err { - return nil, err - } +// if p.probeEntryURL, err = opuu.Join(config.Config.Central.URL, probeEntryPoint); nil != err { +// return nil, err +// } +// if p.fileEntryURL, err = opuu.Join(config.Config.Central.URL, fileEntryPoint); nil != err { +// return nil, err +// } +// if p.metricEntryURL, err = opuu.Join(config.Config.Central.URL, metricEntryPoint); nil != err { +// return nil, err +// } - p.probeClient = client.New() - p.fileClient = client.New() - p.metricClient = client.New() +// p.probeClient = client.New() +// p.fileClient = client.New() +// p.metricClient = client.New() - return p, nil -} +// return p, nil +// } -func (p *probe) Start() error { +// func (p *probe) Start() error { +// if err := p.connectToCentral(); nil != err { +// return err +// } - return p.start() -} +// p.listen() -func (p *probe) start() error { - if err := p.connectToCentral(); nil != err { - return err - } +// return nil +// } - p.listen() +// func (p *probe) listen() { +// go func() { +// for { +// select { +// case <-p.shutdown: +// break +// } +// } +// }() - return nil -} +// } -func (p *probe) listen() { - go func() { - for { - select { - case <-p.shutdown: - break - } - } - }() -} +// func (p *probe) connectToCentral() error { +// var err error +// var res *http.Response +// if res, err = client.ConnectToCentralAsProbe(p.probeClient, p.probeEntryURL); nil != err { +// return err +// } -func (p *probe) connectToCentral() error { - var err error - var res *http.Response - if res, err = client.ConnectToCentralAsProbe(p.probeClient, p.probeEntryURL); nil != err { - return err - } +// encryptionKey := res.Header.Get(module.ProbeHeader_Probe_EncryptionKey) +// config.EncryptionKey = &encryptionKey - encryptionKey := res.Header.Get(module.ProbeHeader_Probe_EncryptionKey) - config.EncryptionKey = &encryptionKey +// p.probeClient.OnNotify(p.onNotify) - p.probeClient.OnNotify(p.onNotify) +// // if _, err = client.ConnectToCentralAsProbe(p.metricClient, p.metricEntryURL); nil != err { +// // return err +// // } - // if _, err = client.ConnectToCentralAsProbe(p.metricClient, p.metricEntryURL); nil != err { - // return err - // } +// return nil +// } - return nil -} +// func (p *probe) sendNotifyToCentral(method string, params ...string) { +// if err := p.probeClient.Notify(method, params); nil != err { +// logging.Logger().Error(fmt.Sprintf("Probe notify: %v", err)) +// } +// } -func (p *probe) sendNotifyToCentral(method string, params ...string) { - if err := p.probeClient.Notify(method, params); nil != err { - logging.Logger.Error(fmt.Sprintf("Probe notify: %v", err)) - } -} +// func (p *probe) Shutdown(ctx context.Context) error { +// for { +// p.stop(fmt.Errorf("Shutdown")) +// select { +// case <-ctx.Done(): +// return ctx.Err() +// } +// } +// } -func (p *probe) Shutdown(ctx context.Context) error { - for { - p.stop(fmt.Errorf("Shutdown")) - select { - case <-ctx.Done(): - return ctx.Err() - } - } -} +// func (p *probe) stop(err error) { +// defer close(p.shutdown) -func (p *probe) stop(err error) { - defer close(p.shutdown) +// ctx := context.Background() +// if err := p.probeClient.Shutdown(ctx); nil != err { +// logging.Logger().Error(fmt.Sprintf("Client of Probe: %v", err)) +// } - ctx := context.Background() - if err := p.probeClient.Shutdown(ctx); nil != err { - logging.Logger.Error(fmt.Sprintf("Client of Probe: %v", err)) - } - -} +// } diff --git a/util/url/url.go b/util/url/url.go deleted file mode 100644 index bf4411d..0000000 --- a/util/url/url.go +++ /dev/null @@ -1,20 +0,0 @@ -package util - -import ( - "net/url" - "path" -) - -// Join is concat URL string and path -// ex) http://127.0.0.1/ and /entry -func Join(u string, p string) (string, error) { - var err error - var rURL *url.URL - - if rURL, err = url.Parse(u); nil != err { - return "", err - } - - rURL.Path = path.Join(rURL.Path, p) - return rURL.String(), nil -}