package data_sender_go import ( "encoding/json" "io/ioutil" cm "loafle.com/overflow/agent_api/config_manager" "loafle.com/overflow/agent_api/messages" pb "loafle.com/overflow/crawler_go/grpc" q "loafle.com/overflow/queue_go" "log" "os" "sync" ) const ( FILE_PATH = "/overflow/tmp/data.tmp" DEFAULT_INTERVAL = 10 ) var ( instance *DataSender once sync.Once ) type DataSender struct { once sync.Once queue *q.LoafleQueue gconf *cm.GlobalConfig } func Start(ch chan bool, conf *cm.GlobalConfig) { d := GetInstance() d.start(conf) ch <- true } func Stop(ch chan bool) { GetInstance().stop() ch <- true } func GetInstance() *DataSender { once.Do(func() { instance = &DataSender{} }) return instance } func (ds *DataSender) start(conf *cm.GlobalConfig) { qc := make(chan interface{}) ds.queue = q.NewQueue(DEFAULT_INTERVAL, qc) ds.gconf = conf go ds.handleData(qc) } func (ds *DataSender) stop() {} func (ds *DataSender) handleData(qc chan interface{}) { for { select { case items := <-qc: result := make([]*messages.Data, 0) for _, item := range items.([]*q.Item) { collectedData := item.Value.(*pb.Output) d := &messages.Data{} d.Data = collectedData.Data d.AgentId = agentIdentifier() result = append(result, d) } ds.send(result) } } } func AddData(data interface{}) { ds := GetInstance() ds.queue.PushItem(data) } func (ds *DataSender) send(data []*messages.Data) { //for _, v := range data { // //log.Printf("SEND SENSOR RESULT : %s - %s", v.SensorId, v.Data) //} //ds.addFailedData(data) //addr := ds.gconf.Central.Address + ":" + string(ds.gconf.Central.Port) //conn, err := grpc.Dial(addr, grpc.WithInsecure()) //if err != nil { // ds.saveFailedData(data) // return //} //defer conn.Close() //temporary //client := pb.NewStatusClient(conn) //out, err := client.Status(context.Background(), &pb.Empty{}) //if err != nil { // ds.saveFailedData(data) // return //} //ds.removeFailed() //log.Print(out) } func (ds *DataSender) addFailedData(data []*messages.Data) { bytes := ds.getFailedData() if bytes != nil { failed := messages.Data{} err := json.Unmarshal(bytes, &failed) if err != nil { log.Println(err) } data = append([]*messages.Data{&failed}, data...) //prepend } } func (ds *DataSender) getFailedData() []byte { b, err := ioutil.ReadFile(FILE_PATH) if err != nil { log.Println(err) return nil } return b } func (ds *DataSender) removeFailed() { err := os.Remove(FILE_PATH) if err != nil { log.Println(err) } } func (ds *DataSender) saveFailedData(datas []*messages.Data) { file, err := tempFile() if err != nil { log.Println(err) } defer func() { if err := file.Close(); err != nil { log.Println(err) return } }() for _, data := range datas { bytes, err := json.Marshal(&data) if err != nil { log.Println(err) } if bytes != nil { log.Println("write : ", string(bytes)) _, err = file.Write(bytes) if err != nil { log.Println(err) } } } } func tempFile() (*os.File, error) { var file *os.File var fileInfo os.FileInfo var err error if fileInfo, err = os.Stat(FILE_PATH); err != nil { if os.IsNotExist(err) { file, err = os.Create(FILE_PATH) if err != nil { return nil, err } } } else { if fileInfo != nil { file, err = os.OpenFile(FILE_PATH, os.O_RDWR|os.O_APPEND, 0660) if err != nil { return nil, err } } } return file, nil } func agentIdentifier() string { return "agentID_000000001" }