data_sender

This commit is contained in:
insanity@loafle.com 2017-04-27 20:22:19 +09:00
parent 34bdb1d8af
commit 26f82a78b0
2 changed files with 110 additions and 52 deletions

View File

@ -5,9 +5,10 @@ import (
"encoding/json" "encoding/json"
"google.golang.org/grpc" "google.golang.org/grpc"
"io/ioutil" "io/ioutil"
cm "loafle.com/overflow/agent_api/config_manager"
"loafle.com/overflow/agent_api/observer" "loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/observer/messages"
pb "loafle.com/overflow/crawler_go/grpc" pb "loafle.com/overflow/crawler_go/grpc"
"loafle.com/overflow/cron_go"
q "loafle.com/overflow/queue_go" q "loafle.com/overflow/queue_go"
"log" "log"
"os" "os"
@ -17,8 +18,12 @@ import (
const ( const (
CENTRAL_ADDR = "127.0.0.1:50052" CENTRAL_ADDR = "127.0.0.1:50052"
FILE_PATH = "/overflow/tmp/data.tmp" FILE_PATH = "/overflow/tmp/data.tmp"
SENDER_ID = "OVERFLOW_DATA_SENDER" DEFAULT_INTERVAL = 10
DEFAULT_INTERVAL = uint64(5) )
var (
instance *DataSender
once sync.Once
) )
type Data struct { type Data struct {
@ -31,55 +36,68 @@ type Data struct {
type DataSender struct { type DataSender struct {
once sync.Once once sync.Once
runStat chan bool
queue *q.LoafleQueue queue *q.LoafleQueue
gconf *cm.GlobalConfig
} }
func (ds *DataSender) Start() { func init() {
addObservers()
}
func GetInstance() *DataSender {
once.Do(func() {
instance = &DataSender{}
})
return instance
}
func addObservers() {
ch := make(chan interface{}, 0)
observer.Add(messages.CONFIGMANAGER_LOADED, ch)
handleInit(ch)
}
func handleInit(ch chan interface{}) {
ds := GetInstance()
go func() {
data := <-ch
log.Println(data)
//ds.gconf = data.(cm.ConfigManager).GetGlobalConfig()
ds.start()
close(ch)
}()
}
func (ds *DataSender) start() {
ds.once.Do(func() { ds.once.Do(func() {
ds.init() qc := make(chan interface{})
ds.queue = q.NewQueue(messages.QUEUE_DATA, DEFAULT_INTERVAL, qc)
go ds.handleData(qc)
}) })
} }
func (ds *DataSender) Stop() { func (ds *DataSender) Stop() {
ds.runStat <- false
} }
func (ds *DataSender) init() { func (ds *DataSender) handleData(qc chan interface{}) {
ds.queue = q.NewQueue(observer.DATA_QUEUE, 3) for {
select {
cr := &cron.Cron{} case items := <-qc:
ds.runStat = cr.Start()
cr.AddTask(SENDER_ID, DEFAULT_INTERVAL).Invoke(ds.check)
}
func (ds *DataSender) check() {
items := ds.queue.GetItems()
len := len(items)
if len <= 0 {
return
}
result := make([]*Data, 0) result := make([]*Data, 0)
for _, item := range items { for _, item := range items.([]*q.Item) {
collectedData := item.Value.(*Data) collectedData := item.Value.(*Data)
collectedData.AgentId = agentIdentifier() collectedData.AgentId = agentIdentifier()
result = append(result, collectedData) result = append(result, collectedData)
} }
ds.send(result) ds.send(result)
}
}
} }
func (ds *DataSender) send(data []*Data) { func (ds *DataSender) send(data []*Data) {
bytes := ds.getFailedData() ds.addFailedData(data)
if bytes != nil {
failed := Data{}
err := json.Unmarshal(bytes, &failed)
if err != nil {
log.Println(err)
}
data = append([]*Data{&failed}, data...) //prepend
ds.removeFailed()
}
conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure()) conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure())
if err != nil { if err != nil {
@ -95,9 +113,22 @@ func (ds *DataSender) send(data []*Data) {
ds.saveFailedData(data) ds.saveFailedData(data)
return return
} }
ds.removeFailed()
log.Print(out) log.Print(out)
} }
func (ds *DataSender) addFailedData(data []*Data) {
bytes := ds.getFailedData()
if bytes != nil {
failed := Data{}
err := json.Unmarshal(bytes, &failed)
if err != nil {
log.Println(err)
}
data = append([]*Data{&failed}, data...) //prepend
}
}
func (ds *DataSender) getFailedData() []byte { func (ds *DataSender) getFailedData() []byte {
b, err := ioutil.ReadFile(FILE_PATH) b, err := ioutil.ReadFile(FILE_PATH)
if err != nil { if err != nil {

View File

@ -1,8 +1,9 @@
package data_sender_go package data_sender_go
import ( import (
"fmt"
"loafle.com/overflow/agent_api/observer" "loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/observer/messages"
"strconv"
"testing" "testing"
"time" "time"
) )
@ -11,28 +12,54 @@ type Result struct {
Data []byte Data []byte
} }
func TestSend(t *testing.T) { func TestTotal(t *testing.T) {
ds := &DataSender{} time.Sleep(time.Second * 5)
ds.Start()
testNotify() observer.Notify("CONFIGMANAGER_LOADED", nil)
time.Sleep(time.Second * 5)
for i := 0; i < 20; i++ {
testNotify(strconv.Itoa(i))
}
time.Sleep(time.Second * 12)
for i := 20; i < 30; i++ {
testNotify(strconv.Itoa(i))
}
time.Sleep(time.Second * 100) time.Sleep(time.Second * 100)
} }
func testNotify() { func TestSend(t *testing.T) {
time.Sleep(time.Second * 5)
ds := &DataSender{}
ds.start()
for i := 0; i < 20; i++ {
testNotify(strconv.Itoa(i))
}
time.Sleep(time.Second * 12)
for i := 20; i < 30; i++ {
testNotify(strconv.Itoa(i))
}
time.Sleep(time.Second * 100)
}
func testNotify(val string) {
result := make(map[string]string) result := make(map[string]string)
result["ab"] = "123" result["a"] = val
result["cd"] = "456" result["b"] = val
result["ef"] = "789" result["c"] = val
cd := &Data{ cd := &Data{
SensorId: "insanity", SensorId: "insanity",
Data: result, Data: result,
} }
fmt.Println("New data Notify") observer.Notify(messages.QUEUE_DATA, cd)
observer.Notify(observer.DATA_QUEUE.String(), cd)
} }