data_sender

This commit is contained in:
insanity@loafle.com 2017-05-15 19:07:59 +09:00
parent 3aff00acc5
commit 1370a0c821
2 changed files with 38 additions and 60 deletions

View File

@ -1,12 +1,9 @@
package data_sender_go package data_sender_go
import ( import (
"context"
"encoding/json" "encoding/json"
"google.golang.org/grpc"
"io/ioutil" "io/ioutil"
cm "loafle.com/overflow/agent_api/config_manager" cm "loafle.com/overflow/agent_api/config_manager"
"loafle.com/overflow/agent_api/observer/messages"
pb "loafle.com/overflow/crawler_go/grpc" pb "loafle.com/overflow/crawler_go/grpc"
q "loafle.com/overflow/queue_go" q "loafle.com/overflow/queue_go"
"log" "log"
@ -27,7 +24,7 @@ var (
type Data struct { type Data struct {
AgentId string AgentId string
SensorId string SensorId string
Data map[string]string Data []byte
StartedAt uint64 StartedAt uint64
FinishedAt uint64 FinishedAt uint64
} }
@ -38,8 +35,15 @@ type DataSender struct {
gconf *cm.GlobalConfig gconf *cm.GlobalConfig
} }
func init() { func Start(ch chan bool, conf *cm.GlobalConfig) {
go handleConfigLoaded() d := DataSender{}
d.start(conf)
ch <- true
}
func Stop(ch chan bool) {
GetInstance().stop()
ch <- true
} }
func GetInstance() *DataSender { func GetInstance() *DataSender {
@ -49,30 +53,17 @@ func GetInstance() *DataSender {
return instance return instance
} }
func startDataSender(ch chan interface{}) { func (ds *DataSender) start(conf *cm.GlobalConfig) {
ds := GetInstance()
go func() {
data := <-ch
log.Println(data)
ds.gconf = data.(cm.ConfigManager).GetGlobalConfig()
ds.start()
ds.addObservers()
}()
}
func (ds *DataSender) start() {
ds.once.Do(func() { ds.once.Do(func() {
qc := make(chan interface{}) qc := make(chan interface{})
ds.queue = q.NewQueue(messages.QUEUE_DATA, DEFAULT_INTERVAL, qc) ds.queue = q.NewQueue(DEFAULT_INTERVAL, qc)
ds.gconf = conf
go ds.handleData(qc) go ds.handleData(qc)
}) })
} }
func (ds *DataSender) addObservers() { func (ds *DataSender) stop() {}
go ds.handleAgentStop()
}
func (ds *DataSender) Stop() {}
func (ds *DataSender) handleData(qc chan interface{}) { func (ds *DataSender) handleData(qc chan interface{}) {
for { for {
@ -80,36 +71,43 @@ func (ds *DataSender) handleData(qc chan interface{}) {
case items := <-qc: case items := <-qc:
result := make([]*Data, 0) result := make([]*Data, 0)
for _, item := range items.([]*q.Item) { for _, item := range items.([]*q.Item) {
collectedData := item.Value.(*Data) collectedData := item.Value.(*pb.Output)
collectedData.AgentId = agentIdentifier() d := &Data{}
result = append(result, collectedData) d.Data = collectedData.Data
d.AgentId = agentIdentifier()
result = append(result, d)
} }
ds.send(result) ds.send(result)
} }
} }
} }
func AddData(data interface{}) {
GetInstance().queue.PushItem(data)
}
func (ds *DataSender) send(data []*Data) { func (ds *DataSender) send(data []*Data) {
ds.addFailedData(data) ds.addFailedData(data)
log.Println("TO CENTRAL: ", data)
addr := ds.gconf.Central.Address + ":" + string(ds.gconf.Central.Port) //addr := ds.gconf.Central.Address + ":" + string(ds.gconf.Central.Port)
conn, err := grpc.Dial(addr, grpc.WithInsecure()) //conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil { //if err != nil {
ds.saveFailedData(data) // ds.saveFailedData(data)
return // return
} //}
defer conn.Close() //defer conn.Close()
//temporary //temporary
client := pb.NewStatusClient(conn) //client := pb.NewStatusClient(conn)
out, err := client.Status(context.Background(), &pb.Empty{}) //out, err := client.Status(context.Background(), &pb.Empty{})
if err != nil { //if err != nil {
ds.saveFailedData(data) // ds.saveFailedData(data)
return // return
} //}
ds.removeFailed() ds.removeFailed()
log.Print(out) //log.Print(out)
} }
func (ds *DataSender) addFailedData(data []*Data) { func (ds *DataSender) addFailedData(data []*Data) {

View File

@ -1,20 +0,0 @@
package data_sender_go
import (
"loafle.com/overflow/agent_api/observer"
"loafle.com/overflow/agent_api/observer/messages"
)
func handleConfigLoaded() {
ch := make(chan interface{}, 0)
observer.Add(messages.CFG_LOADED, ch)
startDataSender(ch)
//observer.Remove(messages.CFG_LOADED, ch)
}
func (ds *DataSender) handleAgentStop() {
ch := make(chan interface{}, 0)
observer.Add(messages.AGT_STOPPED, ch)
ds.Stop()
//observer.Remove(messages.AGT_STOPPED, ch)
}