diff --git a/data_sender.go b/data_sender.go index d6f045d..93be1f0 100644 --- a/data_sender.go +++ b/data_sender.go @@ -1,22 +1,22 @@ package data_sender_go import ( - "google.golang.org/grpc" - "log" - "context" "encoding/json" - "fmt" + "google.golang.org/grpc" + "io/ioutil" + "loafle.com/overflow/agent_api/observer" pb "loafle.com/overflow/crawler_go/grpc" "loafle.com/overflow/cron_go" q "loafle.com/overflow/queue_go" + "log" "os" "sync" ) const ( CENTRAL_ADDR = "127.0.0.1:50052" - FILE_PATH = "/home/insanity/data/temp" + FILE_PATH = "/home/insanity/data/data.tmp" SENDER_ID = "OVERFLOW_DATA_SENDER" DEFAULT_INTERVAL = uint64(5) ) @@ -45,7 +45,7 @@ func (ds *DataSender) Stop() { } func (ds *DataSender) init() { - ds.queue = q.NewQueue(q.EVENT_TYPE, 3) + ds.queue = q.NewQueue(observer.DATA_QUEUE, 3) cr := &cron.Cron{} ds.runStat = cr.Start() @@ -58,7 +58,7 @@ func (ds *DataSender) check() { if len <= 0 { return } - result := make([]*Data, len) + result := make([]*Data, 0) for _, item := range items { collectedData := item.Value.(*Data) collectedData.AgentId = agentIdentifier() @@ -69,9 +69,22 @@ func (ds *DataSender) check() { } func (ds *DataSender) send(data []*Data) { + + bytes := ds.getFailedData() + if bytes != nil { + failed := Data{} + err := json.Unmarshal(bytes, &failed) + if err != nil { + log.Println(err) + } + data = append([]*Data{&failed}, data...) //prepend + ds.removeFailed() + } + + return conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure()) if err != nil { - saveFailedData(data) + ds.saveFailedData(data) return } defer conn.Close() @@ -80,13 +93,29 @@ func (ds *DataSender) send(data []*Data) { client := pb.NewStatusClient(conn) out, err := client.Status(context.Background(), &pb.Empty{}) if err != nil { - saveFailedData(data) + ds.saveFailedData(data) return } log.Print(out) } -func saveFailedData(datas []*Data) { +func (ds *DataSender) getFailedData() []byte { + b, err := ioutil.ReadFile(FILE_PATH) + if err != nil { + log.Println(err) + return nil + } + return b +} + +func (ds *DataSender) removeFailed() { + err := os.Remove(FILE_PATH) + if err != nil { + log.Println(err) + } +} + +func (ds *DataSender) saveFailedData(datas []*Data) { file, err := tempFile() if err != nil { @@ -100,13 +129,12 @@ func saveFailedData(datas []*Data) { }() for _, data := range datas { - bytes, err := json.Marshal(&data) if err != nil { log.Println(err) } if bytes != nil { - fmt.Println("write : ", string(bytes)) + log.Println("write : ", string(bytes)) _, err = file.Write(bytes) if err != nil { log.Println(err) diff --git a/data_sender_test.go b/data_sender_test.go index 0f6b941..ad8ddf1 100644 --- a/data_sender_test.go +++ b/data_sender_test.go @@ -3,7 +3,6 @@ package data_sender_go import ( "fmt" "loafle.com/overflow/agent_api/observer" - "loafle.com/overflow/queue_go" "testing" "time" ) @@ -35,5 +34,5 @@ func testNotify() { } fmt.Println("New data Notify") - observer.Notify(queue.StringType(queue.EVENT_TYPE), cd) + observer.Notify(observer.DATA_QUEUE.String(), cd) }