data_sender

This commit is contained in:
insanity@loafle.com 2017-04-26 18:07:44 +09:00
parent 148dcd55af
commit 048a8e70ca
2 changed files with 108 additions and 30 deletions

View File

@ -2,20 +2,73 @@ package data_sender_go
import ( import (
"google.golang.org/grpc" "google.golang.org/grpc"
_ "loafle.com/overflow/cron_go"
"log" "log"
pb "loafle.com/overflow/crawler_go/grpc"
"context" "context"
"encoding/json"
pb "loafle.com/overflow/crawler_go/grpc"
"loafle.com/overflow/cron_go"
q "loafle.com/overflow/queue_go"
"os" "os"
"sync"
) )
const ( const (
CENTRAL_ADDR = "127.0.0.1:50052" CENTRAL_ADDR = "127.0.0.1:50052"
FILE_PATH = "/home/insanity/data/temp" FILE_PATH = "/home/insanity/data/temp"
SENDER_ID = "OVERFLOW_DATA_SENDER"
DEFAULT_INTERVAL = uint64(5)
) )
func Send(data []byte) { type Data struct {
AgentId string
SensorId string
Data map[string]string
StartedAt uint64
FinishedAt uint64
}
type DataSender struct {
once sync.Once
runStat chan bool
queue *q.LoafleQueue
}
func (ds *DataSender) Start() {
ds.once.Do(func() {
ds.init()
})
}
func (ds *DataSender) Stop() {
ds.runStat <- false
}
func (ds *DataSender) init() {
ds.queue = q.NewQueue(q.EVENT_TYPE, 3)
cr := &cron.Cron{}
ds.runStat = cr.Start()
cr.AddTask(SENDER_ID, DEFAULT_INTERVAL).Invoke(ds.check)
}
func (ds *DataSender) check() {
items := ds.queue.GetItems()
len := len(items)
if len <= 0 {
return
}
result := make([]*Data, len)
for _, item := range items {
collectedData := item.Value.(*Data)
collectedData.AgentId = agentIdentifier()
log.Println("collectedData : ", collectedData)
result = append(result, collectedData)
}
ds.send(result)
}
func (ds *DataSender) send(data []*Data) {
conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure()) conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure())
if err != nil { if err != nil {
saveFailedData(data) saveFailedData(data)
@ -33,7 +86,7 @@ func Send(data []byte) {
log.Print(out) log.Print(out)
} }
func saveFailedData(data []byte) { func saveFailedData(datas []*Data) {
file, err := tempFile() file, err := tempFile()
if err != nil { if err != nil {
@ -46,10 +99,18 @@ func saveFailedData(data []byte) {
} }
}() }()
_, err = file.Write(data) for _, data := range datas {
bytes, err := json.Marshal(&data)
log.Println("write : ", string(bytes))
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
_, err = file.Write(bytes)
if err != nil {
log.Println(err)
}
}
} }
func tempFile() (*os.File, error) { func tempFile() (*os.File, error) {
@ -66,7 +127,7 @@ func tempFile() (*os.File, error) {
} }
} else { } else {
if fileInfo != nil { if fileInfo != nil {
file, err = os.OpenFile(FILE_PATH, os.O_RDWR|os.O_APPEND, 0660); file, err = os.OpenFile(FILE_PATH, os.O_RDWR|os.O_APPEND, 0660)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -75,3 +136,7 @@ func tempFile() (*os.File, error) {
return file, nil return file, nil
} }
func agentIdentifier() string {
return "agentID_000000001"
}

View File

@ -1,26 +1,39 @@
package data_sender_go package data_sender_go
import ( import (
"testing"
"loafle.com/overflow/crawler_go/grpc"
"time"
"bytes"
"encoding/binary"
"fmt" "fmt"
"loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/queue_go"
"testing"
"time"
) )
type Result struct {
Data []byte
}
func TestSend(t *testing.T) { func TestSend(t *testing.T) {
result := &grpc.Output{
StartDate:time.Now().UnixNano(), ds := &DataSender{}
EndDate:time.Now().UnixNano(), ds.Start()
Data:[]byte{'a','b','c'}, testNotify()
time.Sleep(time.Second * 100)
} }
var buf bytes.Buffer func testNotify() {
binary.Write(&buf, binary.BigEndian, result) time.Sleep(time.Second * 5)
fmt.Println(buf.Bytes()) result := make(map[string]string)
Send(buf.Bytes()) result["ab"] = "123"
result["cd"] = "456"
result["ef"] = "789"
cd := &Data{
SensorId: "insanity",
Data: result,
} }
fmt.Println("New data Notify")
observer.Notify(queue.StringType(queue.EVENT_TYPE), cd)
}