resend failed

This commit is contained in:
insanity@loafle.com 2017-04-26 20:00:27 +09:00
parent 4f64a4a915
commit 87f6a06831
2 changed files with 41 additions and 14 deletions

View File

@ -1,22 +1,22 @@
package data_sender_go package data_sender_go
import ( import (
"google.golang.org/grpc"
"log"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "google.golang.org/grpc"
"io/ioutil"
"loafle.com/overflow/agent_api/observer"
pb "loafle.com/overflow/crawler_go/grpc" pb "loafle.com/overflow/crawler_go/grpc"
"loafle.com/overflow/cron_go" "loafle.com/overflow/cron_go"
q "loafle.com/overflow/queue_go" q "loafle.com/overflow/queue_go"
"log"
"os" "os"
"sync" "sync"
) )
const ( const (
CENTRAL_ADDR = "127.0.0.1:50052" CENTRAL_ADDR = "127.0.0.1:50052"
FILE_PATH = "/home/insanity/data/temp" FILE_PATH = "/home/insanity/data/data.tmp"
SENDER_ID = "OVERFLOW_DATA_SENDER" SENDER_ID = "OVERFLOW_DATA_SENDER"
DEFAULT_INTERVAL = uint64(5) DEFAULT_INTERVAL = uint64(5)
) )
@ -45,7 +45,7 @@ func (ds *DataSender) Stop() {
} }
func (ds *DataSender) init() { func (ds *DataSender) init() {
ds.queue = q.NewQueue(q.EVENT_TYPE, 3) ds.queue = q.NewQueue(observer.DATA_QUEUE, 3)
cr := &cron.Cron{} cr := &cron.Cron{}
ds.runStat = cr.Start() ds.runStat = cr.Start()
@ -58,7 +58,7 @@ func (ds *DataSender) check() {
if len <= 0 { if len <= 0 {
return return
} }
result := make([]*Data, len) result := make([]*Data, 0)
for _, item := range items { for _, item := range items {
collectedData := item.Value.(*Data) collectedData := item.Value.(*Data)
collectedData.AgentId = agentIdentifier() collectedData.AgentId = agentIdentifier()
@ -69,9 +69,22 @@ func (ds *DataSender) check() {
} }
func (ds *DataSender) send(data []*Data) { func (ds *DataSender) send(data []*Data) {
bytes := ds.getFailedData()
if bytes != nil {
failed := Data{}
err := json.Unmarshal(bytes, &failed)
if err != nil {
log.Println(err)
}
data = append([]*Data{&failed}, data...) //prepend
ds.removeFailed()
}
return
conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure()) conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure())
if err != nil { if err != nil {
saveFailedData(data) ds.saveFailedData(data)
return return
} }
defer conn.Close() defer conn.Close()
@ -80,13 +93,29 @@ func (ds *DataSender) send(data []*Data) {
client := pb.NewStatusClient(conn) client := pb.NewStatusClient(conn)
out, err := client.Status(context.Background(), &pb.Empty{}) out, err := client.Status(context.Background(), &pb.Empty{})
if err != nil { if err != nil {
saveFailedData(data) ds.saveFailedData(data)
return return
} }
log.Print(out) log.Print(out)
} }
func saveFailedData(datas []*Data) { func (ds *DataSender) getFailedData() []byte {
b, err := ioutil.ReadFile(FILE_PATH)
if err != nil {
log.Println(err)
return nil
}
return b
}
func (ds *DataSender) removeFailed() {
err := os.Remove(FILE_PATH)
if err != nil {
log.Println(err)
}
}
func (ds *DataSender) saveFailedData(datas []*Data) {
file, err := tempFile() file, err := tempFile()
if err != nil { if err != nil {
@ -100,13 +129,12 @@ func saveFailedData(datas []*Data) {
}() }()
for _, data := range datas { for _, data := range datas {
bytes, err := json.Marshal(&data) bytes, err := json.Marshal(&data)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
if bytes != nil { if bytes != nil {
fmt.Println("write : ", string(bytes)) log.Println("write : ", string(bytes))
_, err = file.Write(bytes) _, err = file.Write(bytes)
if err != nil { if err != nil {
log.Println(err) log.Println(err)

View File

@ -3,7 +3,6 @@ package data_sender_go
import ( import (
"fmt" "fmt"
"loafle.com/overflow/agent_api/observer" "loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/queue_go"
"testing" "testing"
"time" "time"
) )
@ -35,5 +34,5 @@ func testNotify() {
} }
fmt.Println("New data Notify") fmt.Println("New data Notify")
observer.Notify(queue.StringType(queue.EVENT_TYPE), cd) observer.Notify(observer.DATA_QUEUE.String(), cd)
} }