This commit is contained in:
crusader 2017-12-01 22:01:46 +09:00
parent fa3eb473fb
commit d789464eeb
32 changed files with 567 additions and 918 deletions

View File

@ -1,173 +1,173 @@
package auth package auth
import ( import (
"context"
"fmt" "fmt"
"net/http"
"path" "path"
"sync"
"time"
lfcc "git.loafle.net/commons_go/config" cc "git.loafle.net/commons_go/config"
"git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_probes/central/api/module" ooccn "git.loafle.net/overflow/overflow_commons_go/config/noauthprobe"
"git.loafle.net/overflow/overflow_probes/central/client" noauthprobeM "git.loafle.net/overflow/overflow_commons_go/modules/noauthprobe/model"
"git.loafle.net/overflow/overflow_probes/commons" probeM "git.loafle.net/overflow/overflow_commons_go/modules/probe/model"
oogwc "git.loafle.net/overflow/overflow_gateway_websocket/client"
"git.loafle.net/overflow/overflow_probes/auth/client"
oopai "git.loafle.net/overflow/overflow_probes/auth/info"
"git.loafle.net/overflow/overflow_probes/config" "git.loafle.net/overflow/overflow_probes/config"
opuu "git.loafle.net/overflow/overflow_probes/util/url"
)
const (
noAuthEntryPoint = "/auth"
) )
type Auther interface { type Auther interface {
commons.EndableStarter EndableStart(doneChan chan<- error) error
commons.Shutdowner Stop()
} }
type auth struct { type auth struct {
centralClient client.Client doneChan chan<- error
entryURL string
noAuthConfigPath string cClient oogwc.Client
noAuthConfig config.NoAuthProbeConfig
endded chan<- error configPath string
config ooccn.NoAuthProbeConfig
shutdown chan bool tempProbeKeyChan chan string
accepted chan bool acceptChan chan *probeM.Probe
denied chan error denyChan chan *noauthprobeM.NoAuthProbe
stopChan chan struct{}
stopWg sync.WaitGroup
} }
func New() (Auther, error) { func New() Auther {
var err error a := &auth{}
a := &auth{ return a
shutdown: make(chan bool),
accepted: make(chan bool),
denied: make(chan error),
} }
if a.entryURL, err = opuu.Join(config.CFG.Central.URL, noAuthEntryPoint); nil != err { func (a *auth) EndableStart(doneChan chan<- error) error {
return nil, err if nil != a.stopChan {
logging.Logger().Panic("Auth: auth is already running. Stop it before starting it again")
} }
a.noAuthConfigPath = path.Join(*config.ConfigDir, config.NoAuthProbeConfigFileName) a.configPath = path.Join(*config.ConfigDir, ooccn.ConfigFileName)
conf := lfcc.New() conf := cc.New()
if lfcc.Exists(a.noAuthConfigPath) { if cc.Exists(a.configPath) {
if err = conf.Load(&a.noAuthConfig, a.noAuthConfigPath); nil != err { if err := conf.Load(&a.config, a.configPath); nil != err {
return nil, fmt.Errorf("Auth: Loading of NoAuth config file[%s] failed error[%v]", a.noAuthConfigPath, err) return fmt.Errorf("Auth: Loading of NoAuth config file[%s] failed error[%v]", a.configPath, err)
} }
} }
a.centralClient = client.New() if nil != a.config.DenyDate {
a.centralClient.OnNotify(a.onNotify) return fmt.Errorf("Cannot start because this probe have been denied from overFlow at %s", a.config.DenyDate.String())
return a, nil
} }
func (a *auth) EndableStart(endded chan<- error) error { registerRequestHeader := ""
a.endded = endded if ooccn.NoAuthProbeStateTypeNotRegisterd == a.config.State() {
return a.start() i, err := oopai.GetRegistHeader()
}
func (a *auth) start() error {
if nil != a.noAuthConfig.DenyDate {
return fmt.Errorf("Cannot start because this probe have been denied from overFlow at %s", a.noAuthConfig.DenyDate.String())
}
var err error
if nil != a.noAuthConfig.TempKey && "" != *a.noAuthConfig.TempKey {
err = a.serveConnect(*a.noAuthConfig.TempKey)
} else {
err = a.serveRegistration()
}
if nil != err { if nil != err {
return err return fmt.Errorf("Auth: Gathering system information has been failed %v", err)
}
registerRequestHeader = i
} }
a.listen() a.tempProbeKeyChan = make(chan string, 1)
a.acceptChan = make(chan *probeM.Probe, 1)
a.denyChan = make(chan *noauthprobeM.NoAuthProbe, 1)
rpcInvoker := initRPCRegistry(a)
ch := client.NewClientHandler(rpcInvoker)
sb := client.NewSocketBuilder(&a.config, a.tempProbeKeyChan, registerRequestHeader)
if nil == sb {
return fmt.Errorf("Auth: Cannot create SocketBuilder")
}
a.cClient = client.NewClient(ch, sb)
a.doneChan = doneChan
a.stopChan = make(chan struct{})
a.stopWg.Add(1)
go a.handleAuth()
return nil return nil
} }
func (a *auth) listen() { func (a *auth) Stop() {
go func() { a.destroy(nil)
for {
select {
case <-a.shutdown:
break
case <-a.accepted:
a.stop(nil)
break
case err := <-a.denied:
a.stop(err)
break
} }
func (a *auth) destroy(err error) {
if a.stopChan == nil {
logging.Logger().Panic("Auth: auth must be started before stopping it")
} }
close(a.stopChan)
a.stopWg.Wait()
a.stopChan = nil
a.cClient.Close()
close(a.tempProbeKeyChan)
close(a.acceptChan)
close(a.denyChan)
logging.Logger().Info(fmt.Sprintf("Auth: stopped"))
a.doneChan <- err
}
func (a *auth) handleAuth() {
var err error
defer func() {
a.stopWg.Done()
a.destroy(err)
}() }()
if err = a.cClient.Connect(); nil != err {
return
} }
func (a *auth) serveRegistration() error {
var err error
header := http.Header{}
var enc string
if enc, err = getRegistHeader(); nil != err {
return err
}
header[module.NoAuthProbeHeader_NoAuthRegist] = []string{enc}
var res *http.Response
if res, err = a.centralClient.Dial(a.entryURL, header); nil != err {
return err
}
tempKey := res.Header.Get(module.NoAuthProbeHeader_SetNoAuthID)
a.noAuthConfig.TempKey = &tempKey
if err = lfcc.Save(a.noAuthConfig, a.noAuthConfigPath, true); nil != err {
return err
}
return nil
}
func (a *auth) serveConnect(noAuthTempKey string) error {
var err error
header := http.Header{}
header[module.NoAuthProbeHeader_NoAuthID] = []string{noAuthTempKey}
var res *http.Response
if res, err = a.centralClient.Dial(a.entryURL, header); nil != err {
return err
}
logging.Logger.Debug(fmt.Sprintf("Auth: Connect HTTP Status[%s]", res.Status))
return nil
}
func (a *auth) Shutdown(ctx context.Context) error {
for { for {
a.stop(fmt.Errorf("Shutdown"))
select { select {
case <-ctx.Done(): case tempProbeKey := <-a.tempProbeKeyChan:
return ctx.Err() err = a.handleTempProbeKey(tempProbeKey)
if nil != err {
return
}
case p := <-a.acceptChan:
err = a.handleAccept(p)
return
case nap := <-a.denyChan:
err = a.handleDeny(nap)
return
case <-a.stopChan:
return
} }
} }
} }
func (a *auth) stop(err error) { func (a *auth) handleTempProbeKey(tempProbeKey string) error {
defer close(a.shutdown) a.config.TempKey = &tempProbeKey
a.shutdown <- true if err := cc.Save(a.config, a.configPath, true); nil != err {
close(a.accepted) return err
close(a.denied)
ctx := context.Background()
if err := a.centralClient.Shutdown(ctx); nil != err {
logging.Logger.Error(fmt.Sprintf("Client of Central: %v", err))
} }
a.endded <- err return nil
}
func (a *auth) handleAccept(p *probeM.Probe) error {
config.Config.Probe.Key = &p.ProbeKey
if err := cc.Save(*config.Config, *config.ConfigFilePath, true); nil != err {
return fmt.Errorf("Auth: Saving of config file[%s] failed error[%v]", *config.ConfigFilePath, err)
}
return nil
}
func (a *auth) handleDeny(nap *noauthprobeM.NoAuthProbe) error {
n := time.Now()
a.config.DenyDate = &n
if err := cc.Save(a.config, a.configPath, true); nil != err {
return fmt.Errorf("Auth: Saving of NoAuth config file[%s] failed error[%v]", a.configPath, err)
}
return nil
} }

11
auth/client/client.go Normal file
View File

@ -0,0 +1,11 @@
package client
import (
cwfc "git.loafle.net/commons_go/websocket_fasthttp/client"
oogwc "git.loafle.net/overflow/overflow_gateway_websocket/client"
)
func NewClient(ch oogwc.ClientHandler, sb cwfc.SocketBuilder) oogwc.Client {
return oogwc.New(ch, sb)
}

View File

@ -0,0 +1,32 @@
package client
import (
"fmt"
"git.loafle.net/commons_go/logging"
crc "git.loafle.net/commons_go/rpc/client"
crr "git.loafle.net/commons_go/rpc/registry"
oogwc "git.loafle.net/overflow/overflow_gateway_websocket/client"
)
func NewClientHandler(rpcInvoker crr.RPCInvoker) oogwc.ClientHandler {
ch := &ClientHandlers{}
ch.ClientHandler = oogwc.NewClientHandler(rpcInvoker)
return ch
}
type ClientHandlers struct {
oogwc.ClientHandler
}
func (ch *ClientHandlers) Init(clientCTX crc.ClientContext) error {
logging.Logger().Info(fmt.Sprintf("Auth: client has been initialized"))
return nil
}
func (ch *ClientHandlers) Destroy(clientCTX crc.ClientContext) {
logging.Logger().Info(fmt.Sprintf("Auth: client has been destroyed"))
}

View File

@ -0,0 +1,61 @@
package client
import (
"net/http"
"git.loafle.net/commons_go/logging"
cwfc "git.loafle.net/commons_go/websocket_fasthttp/client"
ooccn "git.loafle.net/overflow/overflow_commons_go/config/noauthprobe"
oocmn "git.loafle.net/overflow/overflow_commons_go/modules/noauthprobe"
oopcc "git.loafle.net/overflow/overflow_probes/central/client"
)
func NewSocketBuilder(config *ooccn.NoAuthProbeConfig, tempProbeKeyChan chan string, registerRequestHeader string) cwfc.SocketBuilder {
sb := &SocketBuilders{
config: config,
tempProbeKeyChan: tempProbeKeyChan,
registerRequestHeader: registerRequestHeader,
}
sb.SocketBuilders = oopcc.NewSocketBuilder(oocmn.HTTPEntry_NoAuthProbe)
if nil == sb.SocketBuilders {
return nil
}
return sb
}
type SocketBuilders struct {
*oopcc.SocketBuilders
config *ooccn.NoAuthProbeConfig
tempProbeKeyChan chan string
registerRequestHeader string
}
func (sb *SocketBuilders) SocketHandler() cwfc.SocketHandler {
return newSocketHandler(sb.config, sb.tempProbeKeyChan)
}
func (sb *SocketBuilders) GetRequestHeader() http.Header {
reqHeader := http.Header{}
switch sb.config.State() {
case ooccn.NoAuthProbeStateTypeNotRegisterd:
reqHeader[oocmn.HTTPRequestHeaderKey_NoAuthProbe_Method] = []string{oocmn.HTTPRequestHeaderValue_NoAuthProbe_Method_Regist}
reqHeader[oocmn.HTTPRequestHeaderKey_NoAuthProbe_Info] = []string{sb.registerRequestHeader}
case ooccn.NoAuthProbeStateTypeRegisterd:
reqHeader[oocmn.HTTPRequestHeaderKey_NoAuthProbe_Method] = []string{oocmn.HTTPRequestHeaderValue_NoAuthProbe_Method_Connect}
reqHeader[oocmn.HTTPRequestHeaderKey_NoAuthProbe_TempProbeKey] = []string{oocmn.HTTPRequestHeaderValue_NoAuthProbe_Method_Connect}
}
return reqHeader
}
func (sb *SocketBuilders) Validate() {
sb.SocketBuilders.Validate()
if nil == sb.config {
logging.Logger().Panic("Auth: NoAuthProbeConfig must be specified")
}
}

View File

@ -0,0 +1,42 @@
package client
import (
"fmt"
"net/http"
"git.loafle.net/commons_go/logging"
cwfc "git.loafle.net/commons_go/websocket_fasthttp/client"
ooccn "git.loafle.net/overflow/overflow_commons_go/config/noauthprobe"
oocmn "git.loafle.net/overflow/overflow_commons_go/modules/noauthprobe"
)
type SocketHandlers struct {
cwfc.SocketHandlers
config *ooccn.NoAuthProbeConfig
tempProbeKeyChan chan string
}
func (sh *SocketHandlers) OnConnect(socketContext cwfc.SocketContext, res *http.Response) {
logging.Logger().Info(fmt.Sprintf("Auth: client has been connected res[%v]", res))
switch sh.config.State() {
case ooccn.NoAuthProbeStateTypeNotRegisterd:
tempProbeKey := res.Header.Get(oocmn.HTTPResponseHeaderKey_NoAuthProbe_SetTempProbeKey)
sh.tempProbeKeyChan <- tempProbeKey
case ooccn.NoAuthProbeStateTypeRegisterd:
}
}
func (sh *SocketHandlers) OnDisconnect(soc cwfc.Socket) {
logging.Logger().Info(fmt.Sprintf("Auth: client has been disconnected soc[%v]", soc))
}
func newSocketHandler(config *ooccn.NoAuthProbeConfig, tempProbeKeyChan chan string) cwfc.SocketHandler {
return &SocketHandlers{
config: config,
tempProbeKeyChan: tempProbeKeyChan,
}
}

View File

@ -1,4 +1,4 @@
package auth package info
import ( import (
"bytes" "bytes"
@ -8,23 +8,24 @@ import (
"net" "net"
"git.loafle.net/commons_go/util/net/gateway" "git.loafle.net/commons_go/util/net/gateway"
"git.loafle.net/overflow/overflow_probes/central/api/module" noauthprobeM "git.loafle.net/overflow/overflow_commons_go/modules/noauthprobe/model"
"git.loafle.net/overflow/overflow_probes/config" "git.loafle.net/overflow/overflow_probes/config"
"github.com/shirou/gopsutil/host" "github.com/shirou/gopsutil/host"
) )
func getRegistHeader() (string, error) { func GetRegistHeader() (string, error) {
var err error var err error
nap := module.NoAuthProbe{ nap := noauthprobeM.NoAuthProbe{
APIKey: config.CFG.Central.APIKey, APIKey: config.Config.Central.APIKey,
} }
var nad *module.NoAuthProbeDescription
if nad, err = getDescription(); nil != err { var napd *noauthprobeM.NoAuthProbeDescription
if napd, err = getDescription(); nil != err {
return "", err return "", err
} }
var buf []byte var buf []byte
if buf, err = json.Marshal(nad); nil != err { if buf, err = json.Marshal(napd); nil != err {
return "", err return "", err
} }
nap.Description = string(buf) nap.Description = string(buf)
@ -38,46 +39,49 @@ func getRegistHeader() (string, error) {
return enc, nil return enc, nil
} }
func getDescription() (*module.NoAuthProbeDescription, error) { func getDescription() (*noauthprobeM.NoAuthProbeDescription, error) {
nad := &module.NoAuthProbeDescription{} var err error
napd := &noauthprobeM.NoAuthProbeDescription{}
if err := getHost(&nad.Host); nil != err { if napd.Host, err = getHost(); nil != err {
return nil, err return nil, err
} }
if err := getNetwork(&nad.Network); nil != err { if napd.Network, err = getNetwork(); nil != err {
return nil, err return nil, err
} }
return nad, nil return napd, nil
} }
func getHost(h *module.NoAuthProbeDescriptionHost) error { func getHost() (*noauthprobeM.NoAuthProbeDescriptionHost, error) {
if i, err := host.Info(); nil == err { if i, err := host.Info(); nil == err {
h := &noauthprobeM.NoAuthProbeDescriptionHost{}
h.Name = i.Hostname h.Name = i.Hostname
h.OS = i.OS h.OS = i.OS
h.Platform = i.Platform h.Platform = i.Platform
h.PlatformFamily = i.PlatformFamily h.PlatformFamily = i.PlatformFamily
h.KernelVersion = i.KernelVersion h.KernelVersion = i.KernelVersion
h.HostID = i.HostID h.HostID = i.HostID
return h, nil
} else { } else {
return err return nil, err
}
} }
return nil func getNetwork() (*noauthprobeM.NoAuthProbeDescriptionNetwork, error) {
}
func getNetwork(n *module.NoAuthProbeDescriptionNetwork) error {
var ip net.IP var ip net.IP
var iface string var iface string
var err error var err error
if ip, iface, err = gateway.DiscoverGateway(); nil != err { if ip, iface, err = gateway.DiscoverGateway(); nil != err {
return err return nil, err
} }
interfaces, err := net.Interfaces() interfaces, err := net.Interfaces()
if err != nil { if err != nil {
return err return nil, err
} }
idx := -1 idx := -1
@ -90,9 +94,11 @@ func getNetwork(n *module.NoAuthProbeDescriptionNetwork) error {
} }
if -1 == idx { if -1 == idx {
return errors.New("Interface of gateway is not exist") return nil, errors.New("Interface of gateway is not exist")
} }
n := &noauthprobeM.NoAuthProbeDescriptionNetwork{}
i := interfaces[idx] i := interfaces[idx]
n.Name = i.Name n.Name = i.Name
@ -109,8 +115,8 @@ func getNetwork(n *module.NoAuthProbeDescriptionNetwork) error {
} }
n.Address = buffer.String() n.Address = buffer.String()
} else { } else {
return err return nil, err
} }
return nil return n, nil
} }

View File

@ -1,55 +0,0 @@
package auth
import (
"fmt"
"time"
lfcc "git.loafle.net/commons_go/config"
"git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_probes/central/api/module"
"git.loafle.net/overflow/overflow_probes/config"
)
func (a *auth) onNotify(method string, params []string) {
switch method {
case module.NoAuthProbeService_AcceptNoAuthProbe:
a.onNoAuthProbeAccept(params)
break
case module.NoAuthProbeService_DenyNoauthProbe:
a.onNoAuthProbeDeny(params)
break
}
}
func (a *auth) onNoAuthProbeAccept(params []string) {
var err error
probeKey := params[0]
// if lfcc.Exists(a.probeConfigPath) {
// if err = lfcc.Load(&a.probeConfig, a.probeConfigPath); nil != err {
// logging.Logger.Error(fmt.Sprintf("Auth: Loading of Probe config file[%s] failed error[%v]", a.probeConfigPath, err))
// }
// }
config.CFG.Probe.Key = &probeKey
if err = lfcc.Save(*config.CFG, *config.ConfigFilePath, true); nil != err {
logging.Logger.Error(fmt.Sprintf("Auth: Saving of config file[%s] failed error[%v]", *config.ConfigFilePath, err))
return
}
a.accepted <- true
}
func (a *auth) onNoAuthProbeDeny(params []string) {
n := time.Now()
a.noAuthConfig.DenyDate = &n
if err := lfcc.Save(a.noAuthConfig, a.noAuthConfigPath, true); nil != err {
logging.Logger.Error(fmt.Sprintf("Auth: Saving of NoAuth config file[%s] failed error[%v]", a.noAuthConfigPath, err))
return
}
a.denied <- fmt.Errorf("This probe have been denied from overFlow")
}

18
auth/rpc.go Normal file
View File

@ -0,0 +1,18 @@
package auth
import (
crr "git.loafle.net/commons_go/rpc/registry"
oopar "git.loafle.net/overflow/overflow_probes/auth/rpc"
)
func initRPCRegistry(a *auth) crr.RPCInvoker {
rpcRegistry := crr.NewRPCRegistry()
napService := &oopar.NoAuthProbeService{
AcceptChan: a.acceptChan,
DenyChan: a.denyChan,
}
rpcRegistry.RegisterService(napService, "")
return rpcRegistry
}

View File

@ -0,0 +1,20 @@
package rpc
import (
noauthprobeM "git.loafle.net/overflow/overflow_commons_go/modules/noauthprobe/model"
probeM "git.loafle.net/overflow/overflow_commons_go/modules/probe/model"
)
type NoAuthProbeService struct {
AcceptChan chan *probeM.Probe
DenyChan chan *noauthprobeM.NoAuthProbe
}
func (s *NoAuthProbeService) Accept(probe *probeM.Probe) {
s.AcceptChan <- probe
}
func (s *NoAuthProbeService) Deny(noAuthProbe *noauthprobeM.NoAuthProbe) {
s.DenyChan <- noAuthProbe
}

1
auth/rpc/rpc.go Normal file
View File

@ -0,0 +1 @@
package rpc

View File

@ -1,7 +0,0 @@
package module
const (
CrawlerService_Install = "CrawlerService.install"
CrawlerService_Uninstall = "CrawlerService.uninstall"
CrawlerService_Update = "CrawlerService.update"
)

View File

@ -1,6 +0,0 @@
package module
const (
DiscoveryService_Start = "DiscoveryService.start"
DiscoveryService_Stop = "DiscoveryService.stop"
)

View File

@ -1,5 +0,0 @@
package module
const (
LogService_Send = "LogService.send"
)

View File

@ -1,45 +0,0 @@
package module
import "time"
const (
NoAuthProbeHeader_NoAuthID = "overFlow-NoAuth-ID"
NoAuthProbeHeader_NoAuthRegist = "overFlow-NoAuth-Regist"
NoAuthProbeHeader_SetNoAuthID = "overFlow-Set-NoAuth-ID"
)
type NoAuthProbe struct {
ID uint64 `json:"id"`
Description string `json:"description"`
TempProbeKey string `json:"tempProbeKey"`
CreateDate time.Time `json:"createDate"`
APIKey string `json:"apiKey"`
}
const (
NoAuthProbeService_Regist = "NoAuthProbeService.regist"
NoAuthProbeService_AcceptNoAuthProbe = "NoAuthProbeService.acceptNoAuthProbe"
NoAuthProbeService_DenyNoauthProbe = "NoAuthProbeService.denyNoauthProbe"
)
type NoAuthProbeDescription struct {
Host NoAuthProbeDescriptionHost `json:"host"`
Network NoAuthProbeDescriptionNetwork `json:"network"`
}
type NoAuthProbeDescriptionHost struct {
Name string `json:"name"`
OS string `json:"os"`
Platform string `json:"paltform"`
PlatformFamily string `json:"platformFamily"`
PlatformVersion string `json:"platformVersion"`
KernelVersion string `json:"kernelVersion"`
HostID string `json:"hostID"`
}
type NoAuthProbeDescriptionNetwork struct {
Name string `json:"name"`
Address string `json:"address"`
Gateway string `json:"gateway"`
MacAddress string `json:"macAddress"`
}

View File

@ -1,22 +0,0 @@
package module
import "time"
const (
ProbeHeader_ProbeKey = "overFlow-Probe-Key"
ProbeHeader_Probe_EncryptionKey = "overFlow-Probe-EncryptionKey"
)
const (
ProbeService_Started = "ProbeService.started"
ProbeService_Stopped = "ProbeService.stopped"
ProbeService_Update = "ProbeService.update"
)
type Probe struct {
ID uint64 `json:"id"`
Description string `json:"description"`
TempProbeKey string `json:"tempProbeKey"`
CreateDate time.Time `json:"createDate"`
ProbeKey string `json:"probeKey"`
}

View File

@ -1,9 +0,0 @@
package module
const (
SensorService_Start = "SensorService.start"
SensorService_Stop = "SensorService.stop"
SensorService_Add = "SensorService.add"
SensorService_Remove = "SensorService.remove"
SensorService_Update = "SensorService.update"
)

View File

@ -1,314 +1 @@
package client package client
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"sync"
"git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_probes/central/client/protocol"
"git.loafle.net/overflow/overflow_probes/config"
"github.com/gorilla/websocket"
)
const (
ProtocolName = "RPC/1.0"
)
type (
OnNotifyFunc func(method string, params []string)
OnCloseFunc func(code int, text string)
)
type ServerError string
func (e ServerError) Error() string {
return string(e)
}
var ErrShutdown = errors.New("connection is shut down")
type Call struct {
Method string // The name of the service and method to call.
Args []string // The argument to the function (*struct).
Result interface{} // The reply from the function (*struct).
Error error // After completion, the error status.
Done chan *Call // Strobes when call is complete.
}
func (c *Call) done() {
select {
case c.Done <- c:
// ok
default:
// We don't want to block here. It is the caller's responsibility to make
// sure the channel has enough buffer space. See comment in Go().
logging.Logger.Debug("Client: discarding Call reply due to insufficient Done chan capacity")
}
}
type Client interface {
Dial(url string, header http.Header) (*http.Response, error)
Call(method string, args []string, result interface{}) error
Notify(method string, args []string) error
OnNotify(cb OnNotifyFunc)
OnClose(cb OnCloseFunc)
Shutdown(ctx context.Context) error
}
type client struct {
conn *websocket.Conn
sendMutex sync.Mutex
request protocol.Request
notification protocol.Notification
mutex sync.Mutex
requestID uint64
pending map[uint64]*Call
closing bool // user has called Close
shutdown bool // server has told us to stop
onNotifyHandler OnNotifyFunc
onCloseHandlers []OnCloseFunc
}
func New() Client {
c := &client{
requestID: 0,
pending: make(map[uint64]*Call),
onCloseHandlers: make([]OnCloseFunc, 1),
}
return c
}
func (c *client) Dial(url string, header http.Header) (*http.Response, error) {
var err error
var res *http.Response
dialer := websocket.Dialer{
ReadBufferSize: config.CFG.Central.ReadBufferSize,
WriteBufferSize: config.CFG.Central.WriteBufferSize,
}
if c.conn, res, err = dialer.Dial(url, header); nil != err {
return nil, err
}
c.conn.SetCloseHandler(c.connCloseHandler)
go c.input()
return res, nil
}
func (c *client) Call(method string, args []string, result interface{}) error {
call := <-c.goCall(method, args, result, make(chan *Call, 1)).Done
return call.Error
}
func (c *client) Notify(method string, args []string) error {
c.sendMutex.Lock()
defer c.sendMutex.Unlock()
c.notification.Protocol = ProtocolName
c.notification.Method = method
c.notification.Params = args
if err := c.conn.WriteJSON(c.notification); nil != err {
return err
}
return nil
}
func (c *client) OnNotify(cb OnNotifyFunc) {
c.onNotifyHandler = cb
}
func (c *client) OnClose(cb OnCloseFunc) {
c.onCloseHandlers = append(c.onCloseHandlers, cb)
}
func (c *client) Shutdown(ctx context.Context) error {
c.mutex.Lock()
if c.closing {
c.mutex.Unlock()
return ErrShutdown
}
c.closing = true
c.mutex.Unlock()
return c.conn.Close()
}
// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (c *client) goCall(method string, args []string, result interface{}, done chan *Call) *Call {
call := new(Call)
call.Method = method
call.Args = args
call.Result = result
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
logging.Logger.Panic("Client: done channel is unbuffered")
}
}
call.Done = done
c.sendCall(call)
return call
}
func (c *client) sendCall(call *Call) {
c.sendMutex.Lock()
defer c.sendMutex.Unlock()
// Register this call.
c.mutex.Lock()
if c.shutdown || c.closing {
call.Error = ErrShutdown
c.mutex.Unlock()
call.done()
return
}
c.requestID++
id := c.requestID
c.pending[id] = call
c.mutex.Unlock()
// Encode and send the request.
c.request.Protocol = ProtocolName
c.request.Method = call.Method
c.request.Params = call.Args
c.request.ID = id
if err := c.conn.WriteJSON(c.request); nil != err {
c.mutex.Lock()
call = c.pending[id]
delete(c.pending, id)
c.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}
func (c *client) input() {
var err error
var res protocol.Response
var noti protocol.Notification
var messageType int
var buff []byte
for err == nil {
if messageType, buff, err = c.conn.ReadMessage(); nil != err {
logging.Logger.Error(fmt.Sprintf("Client: Reader error[%v]", err))
continue
}
logging.Logger.Debug(fmt.Sprintf("Client: messageType:%d", messageType))
if err = json.Unmarshal(buff, &noti); nil != err {
if err = json.Unmarshal(buff, &res); nil != err {
logging.Logger.Error(fmt.Sprintf("Client: Decode error[%v]", err))
continue
} else {
if err = c.onResponse(res); nil != err {
logging.Logger.Error(fmt.Sprintf("Client: Response error[%v]", err))
continue
}
}
} else {
c.onNotification(noti)
}
// if err = json.Unmarshal(buff, &res); nil != err {
// if err = json.Unmarshal(buff, &noti); nil != err {
// logging.Logger.Error(fmt.Sprintf("Client: Decode error[%v]", err))
// continue
// } else {
// c.onNotification(noti)
// }
// } else {
// err = c.onResponse(res)
// }
}
// Terminate pending calls.
c.sendMutex.Lock()
c.mutex.Lock()
c.shutdown = true
closing := c.closing
if err == io.EOF {
if closing {
err = ErrShutdown
} else {
err = io.ErrUnexpectedEOF
}
}
for _, call := range c.pending {
call.Error = err
call.done()
}
c.mutex.Unlock()
c.sendMutex.Unlock()
if err != io.EOF && !closing {
logging.Logger.Debug(fmt.Sprintf("Client: client protocol error:%v", err))
}
}
func (c *client) onResponse(res protocol.Response) error {
var err error
id := res.ID
c.mutex.Lock()
call := c.pending[id]
delete(c.pending, id)
c.mutex.Unlock()
switch {
case call == nil:
case res.Error != nil:
// We've got an error response. Give this to the request;
// any subsequent requests will get the ReadResponseBody
// error if there is one.
if protocol.ProtocolErrorCodeInternal == res.Error.Code {
if nil != res.Error.Message {
call.Error = ServerError(*res.Error.Message)
}
}
call.done()
default:
if err = json.Unmarshal(*res.Result, call.Result); nil != err {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
return err
}
func (c *client) onNotification(noti protocol.Notification) {
if nil == c.onNotifyHandler {
return
}
c.onNotifyHandler(noti.Method, noti.Params)
}
func (c *client) connCloseHandler(code int, text string) error {
for _, h := range c.onCloseHandlers {
h(code, text)
}
return nil
}

View File

@ -1,21 +0,0 @@
package client
import (
"net/http"
"git.loafle.net/overflow/overflow_probes/central/api/module"
"git.loafle.net/overflow/overflow_probes/config"
)
func ConnectToCentralAsProbe(c Client, entryURL string) (*http.Response, error) {
header := http.Header{}
header[module.ProbeHeader_ProbeKey] = []string{*config.CFG.Probe.Key}
var res *http.Response
var err error
if res, err = c.Dial(entryURL, header); nil != err {
return nil, err
}
return res, nil
}

View File

@ -1,5 +0,0 @@
package protocol
type Header struct {
Protocol string `json:"protocol"`
}

View File

@ -1,7 +0,0 @@
package protocol
type Notification struct {
Header
Method string `json:"method"`
Params []string `json:"params,omitempty"`
}

View File

@ -1,19 +0,0 @@
package protocol
type ProtocolErrorCode int
const (
ProtocolErrorCodeParse ProtocolErrorCode = -32700
ProtocolErrorCodeInvalidRequest ProtocolErrorCode = -32600
ProtocolErrorCodeNotFoundMethod ProtocolErrorCode = -32601
ProtocolErrorCodeInvalidParams ProtocolErrorCode = -32602
ProtocolErrorCodeInternal ProtocolErrorCode = -32603
// -32000 ~ -32099
ProtocolErrorCodeServer ProtocolErrorCode = -32000
)
type ProtocolError struct {
Code ProtocolErrorCode `json:"code"`
Message *string `json:"message"`
Data interface{} `json:"data"`
}

View File

@ -1,6 +0,0 @@
package protocol
type Request struct {
Notification
ID uint64 `json:"id,omitempty"`
}

View File

@ -1,10 +0,0 @@
package protocol
import "encoding/json"
type Response struct {
Header
ID uint64 `json:"id"`
Result *json.RawMessage `json:"result,omitempty"`
Error *ProtocolError `json:"error,omitempty"`
}

View File

@ -0,0 +1,28 @@
package client
import (
"fmt"
"git.loafle.net/commons_go/logging"
cunu "git.loafle.net/commons_go/util/net/url"
cwfc "git.loafle.net/commons_go/websocket_fasthttp/client"
"git.loafle.net/overflow/overflow_probes/config"
)
func NewSocketBuilder(entryPath string) *SocketBuilders {
sb := &SocketBuilders{}
url, err := cunu.Join(config.Config.Central.URL, entryPath)
if nil != err {
logging.Logger().Error(fmt.Sprintf("Auth: Cannot create SocketBuilder %v", err))
return nil
}
sb.URL = url
sb.TLSConfig = nil
return sb
}
type SocketBuilders struct {
cwfc.SocketBuilders
}

View File

@ -1,28 +1,13 @@
package config package config
const ( import (
ConfigFileName = "config.json" ooccp "git.loafle.net/overflow/overflow_commons_go/config/probe"
) )
var ( var (
ConfigDir *string ConfigDir *string
ConfigFilePath *string ConfigFilePath *string
CFG *Config
EncryptionKey *string EncryptionKey *string
Config *ooccp.Config
) )
type Config struct {
Central CentralConfig `json:"central" yaml:"central" toml:"central"`
Probe ProbeConfig `json:"probe" yaml:"probe" toml:"probe"`
}
type CentralConfig struct {
URL string `required:"true" json:"url" yaml:"url" toml:"url"`
APIKey string `required:"true" json:"apiKey" yaml:"apiKey" toml:"apiKey"`
ReadBufferSize int `default:"8192" json:"readBufferSize" yaml:"readBufferSize" toml:"readBufferSize"`
WriteBufferSize int `default:"8192" json:"writeBufferSize" yaml:"writeBufferSize" toml:"writeBufferSize"`
}
type ProbeConfig struct {
Key *string `json:"key,omitempty" yaml:"key" toml:"key"`
}

View File

@ -1,12 +0,0 @@
package config
import "time"
const (
NoAuthProbeConfigFileName = "noauthprobe.json"
)
type NoAuthProbeConfig struct {
TempKey *string `json:"tempKey,omitempty" yaml:"tempKey" toml:"tempKey"`
DenyDate *time.Time `json:"denyDate,omitempty" yaml:"denyDate" toml:"denyDate"`
}

6
constants.go Normal file
View File

@ -0,0 +1,6 @@
package main
const (
ConfigPathFlagName = "config-dir"
ConfigFileName = "config.json"
)

View File

@ -9,3 +9,5 @@ import:
version: v2.17.08 version: v2.17.08
- package: github.com/takama/daemon - package: github.com/takama/daemon
version: 0.9.1 version: 0.9.1
- package: github.com/dgrijalva/jwt-go
version: v3.1.0

89
main.go
View File

@ -10,13 +10,13 @@ import (
"syscall" "syscall"
"time" "time"
"git.loafle.net/overflow/overflow_probes/commons"
lfcc "git.loafle.net/commons_go/config" lfcc "git.loafle.net/commons_go/config"
"git.loafle.net/commons_go/logging" "git.loafle.net/commons_go/logging"
oocc "git.loafle.net/overflow/overflow_commons_go/config"
ooccp "git.loafle.net/overflow/overflow_commons_go/config/probe"
"git.loafle.net/overflow/overflow_probes/auth" "git.loafle.net/overflow/overflow_probes/auth"
"git.loafle.net/overflow/overflow_probes/commons"
"git.loafle.net/overflow/overflow_probes/config" "git.loafle.net/overflow/overflow_probes/config"
"git.loafle.net/overflow/overflow_probes/probe"
) )
/* /*
@ -38,7 +38,6 @@ import (
// ) // )
var ( var (
daemonCommand *string
configDir *string configDir *string
) )
@ -49,59 +48,45 @@ func init() {
// flag.PrintDefaults() // flag.PrintDefaults()
// } // }
configDir = flag.String("config-dir", ".", "The directory of config") configDir = oocc.FlagConfigDir()
flag.Parse() flag.Parse()
} }
func main() { func main() {
var err error defer logging.Logger().Sync()
var instance interface{}
defer logging.Logger.Sync()
printBanner() printBanner()
loadConfig()
if dir, err := lfcc.ABSPathify(*configDir); nil != err { var instance interface{}
logging.Logger.Panic(fmt.Sprintf("Config path[%s] is not valid", *configDir))
} else {
logging.Logger.Debug(fmt.Sprintf("Config path: %s", dir))
config.ConfigDir = &dir
}
cfp := path.Join(*config.ConfigDir, config.ConfigFileName)
config.ConfigFilePath = &cfp
conf := lfcc.New()
config.CFG = &config.Config{}
if err := conf.Load(config.CFG, *config.ConfigFilePath); nil != err {
logging.Logger.Panic(fmt.Sprintf("Config is not valid: %v", err))
}
go func() { go func() {
if nil == config.CFG.Probe.Key || "" == *config.CFG.Probe.Key { if ooccp.ProbeStateTypeNotAuthorized == config.Config.Probe.State() {
if instance, err = auth.New(); nil != err { var err error
logging.Logger.Error(fmt.Sprintf("Auth error: %v", err)) instance = auth.New()
authDoneChan := make(chan error, 1)
defer close(authDoneChan)
if err := instance.(commons.EndableStarter).EndableStart(authDoneChan); err != nil {
logging.Logger().Error(err.Error())
return return
} }
endded := make(chan error) err = <-authDoneChan
defer close(endded) if nil != err {
if err := instance.(commons.EndableStarter).EndableStart(endded); err != nil { logging.Logger().Error(err.Error())
logging.Logger.Error(fmt.Sprintf("Auther error: %v", err))
return
}
if err := <-endded; nil != err {
logging.Logger.Error(fmt.Sprintf("Auther error: %v", err))
return return
} }
} }
if instance, err = probe.New(); nil != err { // if instance, err = probe.New(); nil != err {
logging.Logger.Error(fmt.Sprintf("Probe error: %v", err)) // logging.Logger().Error(fmt.Sprintf("Probe error: %v", err))
return // return
} // }
if err := instance.(commons.Starter).Start(); err != nil { // if err := instance.(commons.Starter).Start(); err != nil {
logging.Logger.Error(fmt.Sprintf("Probe error: %v", err)) // logging.Logger().Error(fmt.Sprintf("Probe error: %v", err))
return // return
} // }
}() }()
@ -121,7 +106,25 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
if err := instance.(commons.Shutdowner).Shutdown(ctx); err != nil { if err := instance.(commons.Shutdowner).Shutdown(ctx); err != nil {
logging.Logger.Error(fmt.Sprintf("Probe error: %v", err)) logging.Logger().Error(fmt.Sprintf("Probe error: %v", err))
}
}
func loadConfig() {
if dir, err := lfcc.ABSPathify(*configDir); nil != err {
logging.Logger().Panic(fmt.Sprintf("Config path[%s] is not valid", *configDir))
} else {
logging.Logger().Debug(fmt.Sprintf("Config path: %s", dir))
config.ConfigDir = &dir
}
cfp := path.Join(*config.ConfigDir, ooccp.ConfigFileName)
config.ConfigFilePath = &cfp
conf := lfcc.New()
config.Config = &ooccp.Config{}
if err := conf.Load(config.Config, *config.ConfigFilePath); nil != err {
logging.Logger().Panic(fmt.Sprintf("Config is not valid: %v", err))
} }
} }

View File

@ -1,55 +1,55 @@
package probe package probe
import ( // import (
"fmt" // "fmt"
"git.loafle.net/commons_go/logging" // "git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_probes/central/api/module" // "git.loafle.net/overflow/overflow_probes/central/api/module"
) // )
func (p *probe) onNotify(method string, params []string) { // func (p *probe) onNotify(method string, params []string) {
var err error // var err error
switch method { // switch method {
case module.CrawlerService_Install: // case module.CrawlerService_Install:
break // break
case module.CrawlerService_Uninstall: // case module.CrawlerService_Uninstall:
break // break
case module.CrawlerService_Update: // case module.CrawlerService_Update:
break // break
case module.SensorService_Start: // case module.SensorService_Start:
break // break
case module.SensorService_Stop: // case module.SensorService_Stop:
break // break
case module.SensorService_Add: // case module.SensorService_Add:
break // break
case module.SensorService_Remove: // case module.SensorService_Remove:
break // break
case module.SensorService_Update: // case module.SensorService_Update:
break // break
case module.ProbeService_Update: // case module.ProbeService_Update:
break // break
case module.LogService_Send: // case module.LogService_Send:
break // break
case module.DiscoveryService_Start: // case module.DiscoveryService_Start:
break // break
case module.DiscoveryService_Stop: // case module.DiscoveryService_Stop:
break // break
} // }
if nil != err { // if nil != err {
logging.Logger.Error(fmt.Sprintf("Probe notify error: %v", err)) // logging.Logger().Error(fmt.Sprintf("Probe notify error: %v", err))
} // }
} // }

View File

@ -1,132 +1,128 @@
package probe package probe
import ( // import (
"context" // "context"
"fmt" // "fmt"
"net/http" // "net/http"
"git.loafle.net/commons_go/logging" // "git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_probes/central/api/module" // "git.loafle.net/overflow/overflow_probes/central/api/module"
"git.loafle.net/overflow/overflow_probes/central/client" // "git.loafle.net/overflow/overflow_probes/central/client"
"git.loafle.net/overflow/overflow_probes/commons" // "git.loafle.net/overflow/overflow_probes/commons"
"git.loafle.net/overflow/overflow_probes/config" // "git.loafle.net/overflow/overflow_probes/config"
opuu "git.loafle.net/overflow/overflow_probes/util/url" // opuu "git.loafle.net/overflow/overflow_probes/util/url"
) // )
const ( // const (
probeEntryPoint = "/probe" // probeEntryPoint = "/probe"
fileEntryPoint = "/file" // fileEntryPoint = "/file"
metricEntryPoint = "/metric" // metricEntryPoint = "/metric"
) // )
type Prober interface { // type Prober interface {
commons.Starter // commons.Starter
commons.Shutdowner // commons.Shutdowner
} // }
type probe struct { // type probe struct {
probeEntryURL string // probeEntryURL string
fileEntryURL string // fileEntryURL string
metricEntryURL string // metricEntryURL string
probeClient client.Client // probeClient client.Client
fileClient client.Client // fileClient client.Client
metricClient client.Client // metricClient client.Client
shutdown chan bool // shutdown chan bool
} // }
func New() (Prober, error) { // func New() (Prober, error) {
p := &probe{ // p := &probe{
shutdown: make(chan bool), // shutdown: make(chan bool),
} // }
var err error // var err error
if p.probeEntryURL, err = opuu.Join(config.CFG.Central.URL, probeEntryPoint); nil != err { // if p.probeEntryURL, err = opuu.Join(config.Config.Central.URL, probeEntryPoint); nil != err {
return nil, err // return nil, err
} // }
if p.fileEntryURL, err = opuu.Join(config.CFG.Central.URL, fileEntryPoint); nil != err { // if p.fileEntryURL, err = opuu.Join(config.Config.Central.URL, fileEntryPoint); nil != err {
return nil, err // return nil, err
} // }
if p.metricEntryURL, err = opuu.Join(config.CFG.Central.URL, metricEntryPoint); nil != err { // if p.metricEntryURL, err = opuu.Join(config.Config.Central.URL, metricEntryPoint); nil != err {
return nil, err // return nil, err
} // }
p.probeClient = client.New() // p.probeClient = client.New()
p.fileClient = client.New() // p.fileClient = client.New()
p.metricClient = client.New() // p.metricClient = client.New()
return p, nil // return p, nil
} // }
func (p *probe) Start() error { // func (p *probe) Start() error {
// if err := p.connectToCentral(); nil != err {
return p.start()
}
func (p *probe) start() error {
if err := p.connectToCentral(); nil != err {
return err
}
p.listen()
return nil
}
func (p *probe) listen() {
go func() {
for {
select {
case <-p.shutdown:
break
}
}
}()
}
func (p *probe) connectToCentral() error {
var err error
var res *http.Response
if res, err = client.ConnectToCentralAsProbe(p.probeClient, p.probeEntryURL); nil != err {
return err
}
encryptionKey := res.Header.Get(module.ProbeHeader_Probe_EncryptionKey)
config.EncryptionKey = &encryptionKey
p.probeClient.OnNotify(p.onNotify)
// if _, err = client.ConnectToCentralAsProbe(p.metricClient, p.metricEntryURL); nil != err {
// return err // return err
// } // }
return nil // p.listen()
}
func (p *probe) sendNotifyToCentral(method string, params ...string) { // return nil
if err := p.probeClient.Notify(method, params); nil != err { // }
logging.Logger.Error(fmt.Sprintf("Probe notify: %v", err))
}
}
func (p *probe) Shutdown(ctx context.Context) error { // func (p *probe) listen() {
for { // go func() {
p.stop(fmt.Errorf("Shutdown")) // for {
select { // select {
case <-ctx.Done(): // case <-p.shutdown:
return ctx.Err() // break
} // }
} // }
} // }()
func (p *probe) stop(err error) { // }
defer close(p.shutdown)
ctx := context.Background() // func (p *probe) connectToCentral() error {
if err := p.probeClient.Shutdown(ctx); nil != err { // var err error
logging.Logger.Error(fmt.Sprintf("Client of Probe: %v", err)) // var res *http.Response
} // if res, err = client.ConnectToCentralAsProbe(p.probeClient, p.probeEntryURL); nil != err {
// return err
// }
} // encryptionKey := res.Header.Get(module.ProbeHeader_Probe_EncryptionKey)
// config.EncryptionKey = &encryptionKey
// p.probeClient.OnNotify(p.onNotify)
// // if _, err = client.ConnectToCentralAsProbe(p.metricClient, p.metricEntryURL); nil != err {
// // return err
// // }
// return nil
// }
// func (p *probe) sendNotifyToCentral(method string, params ...string) {
// if err := p.probeClient.Notify(method, params); nil != err {
// logging.Logger().Error(fmt.Sprintf("Probe notify: %v", err))
// }
// }
// func (p *probe) Shutdown(ctx context.Context) error {
// for {
// p.stop(fmt.Errorf("Shutdown"))
// select {
// case <-ctx.Done():
// return ctx.Err()
// }
// }
// }
// func (p *probe) stop(err error) {
// defer close(p.shutdown)
// ctx := context.Background()
// if err := p.probeClient.Shutdown(ctx); nil != err {
// logging.Logger().Error(fmt.Sprintf("Client of Probe: %v", err))
// }
// }

View File

@ -1,20 +0,0 @@
package util
import (
"net/url"
"path"
)
// Join is concat URL string and path
// ex) http://127.0.0.1/ and /entry
func Join(u string, p string) (string, error) {
var err error
var rURL *url.URL
if rURL, err = url.Parse(u); nil != err {
return "", err
}
rURL.Path = path.Join(rURL.Path, p)
return rURL.String(), nil
}