183 lines
3.4 KiB
Go
183 lines
3.4 KiB
Go
package data_sender
|
|
|
|
import (
|
|
"encoding/json"
|
|
cm "git.loafle.net/overflow/overflow_probe/agent_api/config_manager"
|
|
"git.loafle.net/overflow/overflow_probe/agent_api/messages"
|
|
pb "git.loafle.net/overflow/overflow_probe/crawler/grpc"
|
|
q "git.loafle.net/overflow/overflow_probe/queue"
|
|
log "github.com/cihub/seelog"
|
|
"io/ioutil"
|
|
"os"
|
|
"sync"
|
|
)
|
|
|
|
const (
|
|
FILE_PATH = "/overflow/tmp/data.tmp"
|
|
DEFAULT_INTERVAL = 10
|
|
)
|
|
|
|
var (
|
|
instance *DataSender
|
|
once sync.Once
|
|
)
|
|
|
|
type DataSender struct {
|
|
once sync.Once
|
|
queue *q.LoafleQueue
|
|
gconf *cm.GlobalConfig
|
|
}
|
|
|
|
func Start(ch chan bool, conf *cm.GlobalConfig) {
|
|
d := GetInstance()
|
|
d.start(conf)
|
|
ch <- true
|
|
}
|
|
|
|
func Stop(ch chan bool) {
|
|
GetInstance().stop()
|
|
ch <- true
|
|
}
|
|
|
|
func GetInstance() *DataSender {
|
|
once.Do(func() {
|
|
instance = &DataSender{}
|
|
})
|
|
return instance
|
|
}
|
|
|
|
func (ds *DataSender) start(conf *cm.GlobalConfig) {
|
|
qc := make(chan interface{})
|
|
ds.queue = q.NewQueue(DEFAULT_INTERVAL, qc)
|
|
ds.gconf = conf
|
|
go ds.handleData(qc)
|
|
}
|
|
|
|
func (ds *DataSender) stop() {}
|
|
|
|
func (ds *DataSender) handleData(qc chan interface{}) {
|
|
for {
|
|
select {
|
|
case items := <-qc:
|
|
result := make([]*messages.Data, 0)
|
|
for _, item := range items.([]*q.Item) {
|
|
collectedData := item.Value.(*pb.Output)
|
|
d := &messages.Data{}
|
|
d.Data = collectedData.Data
|
|
result = append(result, d)
|
|
}
|
|
ds.send(result)
|
|
}
|
|
}
|
|
}
|
|
|
|
func AddData(data interface{}) {
|
|
ds := GetInstance()
|
|
ds.queue.PushItem(data)
|
|
}
|
|
|
|
func (ds *DataSender) send(data []*messages.Data) {
|
|
//for _, v := range data {
|
|
// //log.Printf("SEND SENSOR RESULT : %s - %s", v.SensorId, v.Data)
|
|
//}
|
|
|
|
//ds.addFailedData(data)
|
|
//addr := ds.gconf.Central.Address + ":" + string(ds.gconf.Central.Port)
|
|
//conn, err := grpc.Dial(addr, grpc.WithInsecure())
|
|
//if err != nil {
|
|
// ds.saveFailedData(data)
|
|
// return
|
|
//}
|
|
//defer conn.Close()
|
|
|
|
//temporary
|
|
//client := pb.NewStatusClient(conn)
|
|
//out, err := client.Status(context.Background(), &pb.Empty{})
|
|
//if err != nil {
|
|
// ds.saveFailedData(data)
|
|
// return
|
|
//}
|
|
//ds.removeFailed()
|
|
//log.Print(out)
|
|
}
|
|
|
|
func (ds *DataSender) addFailedData(data []*messages.Data) {
|
|
bytes := ds.getFailedData()
|
|
if bytes != nil {
|
|
failed := messages.Data{}
|
|
err := json.Unmarshal(bytes, &failed)
|
|
if err != nil {
|
|
log.Error(err)
|
|
}
|
|
data = append([]*messages.Data{&failed}, data...) //prepend
|
|
}
|
|
}
|
|
|
|
func (ds *DataSender) getFailedData() []byte {
|
|
b, err := ioutil.ReadFile(FILE_PATH)
|
|
if err != nil {
|
|
log.Error(err)
|
|
return nil
|
|
}
|
|
return b
|
|
}
|
|
|
|
func (ds *DataSender) removeFailed() {
|
|
err := os.Remove(FILE_PATH)
|
|
if err != nil {
|
|
log.Error(err)
|
|
}
|
|
}
|
|
|
|
func (ds *DataSender) saveFailedData(datas []*messages.Data) {
|
|
|
|
file, err := tempFile()
|
|
if err != nil {
|
|
log.Error(err)
|
|
}
|
|
defer func() {
|
|
if err := file.Close(); err != nil {
|
|
log.Error(err)
|
|
return
|
|
}
|
|
}()
|
|
|
|
for _, data := range datas {
|
|
bytes, err := json.Marshal(&data)
|
|
if err != nil {
|
|
log.Error(err)
|
|
}
|
|
if bytes != nil {
|
|
log.Debug("write : ", string(bytes))
|
|
_, err = file.Write(bytes)
|
|
if err != nil {
|
|
log.Error(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func tempFile() (*os.File, error) {
|
|
|
|
var file *os.File
|
|
var fileInfo os.FileInfo
|
|
var err error
|
|
if fileInfo, err = os.Stat(FILE_PATH); err != nil {
|
|
if os.IsNotExist(err) {
|
|
file, err = os.Create(FILE_PATH)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
} else {
|
|
if fileInfo != nil {
|
|
file, err = os.OpenFile(FILE_PATH, os.O_RDWR|os.O_APPEND, 0660)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
return file, nil
|
|
}
|