event_sender_go/event_sender.go

136 lines
2.5 KiB
Go
Raw Normal View History

2017-04-27 10:02:46 +00:00
package event_sender
import (
"context"
"google.golang.org/grpc"
2017-04-28 09:39:11 +00:00
"loafle.com/overflow/agent_api/observer"
2017-04-27 10:02:46 +00:00
"loafle.com/overflow/agent_api/observer/messages"
pb "loafle.com/overflow/crawler_go/grpc"
q "loafle.com/overflow/queue_go"
"log"
"reflect"
2017-04-28 03:37:33 +00:00
"sync"
2017-05-11 09:55:38 +00:00
"time"
2017-04-28 03:37:33 +00:00
)
var (
instance *EventSender
once sync.Once
2017-04-27 10:02:46 +00:00
)
const (
CENTRAL_ADDR = "192.168.1.105:50052"
SENDER_ID = "OVERFLOW_EVENT_SENDER"
DEFAULT_INTERVAL = 1
)
type Data struct {
AgentId string
SensorId string
Data map[string]string
StartedAt uint64
FinishedAt uint64
}
2017-04-28 03:37:33 +00:00
func GetInstance() *EventSender {
once.Do(func() {
instance = &EventSender{}
})
return instance
}
2017-04-28 09:39:11 +00:00
func init() {
2017-04-28 03:37:33 +00:00
ch := make(chan interface{}, 0)
2017-05-11 09:55:38 +00:00
ch1 := make(chan interface{}, 0)
observer.Add(messages.CFG_LOADED, ch)
2017-05-11 09:56:57 +00:00
observer.Add(messages.SCF_STOPPED, ch1)
2017-05-11 09:55:38 +00:00
handleInit(ch, ch1)
2017-04-28 03:37:33 +00:00
}
2017-05-11 09:55:38 +00:00
func handleInit(ch chan interface{}, ch1 chan interface{}) {
2017-04-28 03:37:33 +00:00
es := GetInstance()
go func() {
data := <-ch
log.Println("handleInit", data)
//ds.gconf = data.(cm.ConfigManager).GetGlobalConfig()
es.start()
2017-05-11 09:55:38 +00:00
observer.Remove(messages.CFG_LOADED, ch)
}()
go func() {
2017-05-11 09:56:57 +00:00
data := <-ch1
observer.Notify(messages.QUEUE_EVENT, data)
time.Sleep(2 * time.Second)
2017-05-11 09:55:38 +00:00
es.Stop()
2017-04-28 03:37:33 +00:00
}()
}
2017-04-27 10:02:46 +00:00
type EventSender struct {
2017-04-28 09:39:11 +00:00
once sync.Once
lq *q.LoafleQueue
sc chan interface{}
2017-04-27 10:02:46 +00:00
}
2017-04-28 03:37:33 +00:00
func (es *EventSender) start() {
2017-04-27 10:02:46 +00:00
es.sc = make(chan interface{}, 0)
es.lq = q.NewQueue(messages.QUEUE_EVENT, DEFAULT_INTERVAL, es.sc)
go es.checkQueue()
}
func (es *EventSender) Stop() {
if es.sc != nil {
close(es.sc)
2017-04-28 02:11:37 +00:00
es.sc = nil
2017-04-27 10:02:46 +00:00
}
}
func (es *EventSender) checkQueue() {
result := make([]*Data, 0)
for sc := range es.sc {
items := reflect.ValueOf(sc)
if items.Kind() != reflect.Slice {
log.Println("ddddd")
}
for i := 0; i < items.Len(); i++ {
item := items.Index(i).Elem().Interface()
tempCollectedData := item.(q.Item)
collectedData := tempCollectedData.Value.(*Data)
collectedData.AgentId = agentIdentifier()
result = append(result, collectedData)
2017-04-28 02:11:37 +00:00
//log.Println("Result Len: ", len(result))
2017-04-27 10:02:46 +00:00
}
es.send(result)
result = nil
}
}
func (es *EventSender) send(data []*Data) {
log.Println("Send Started")
conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure())
if err != nil {
log.Fatal("Connection Error :", err.Error())
return
}
defer conn.Close()
client := pb.NewStatusClient(conn)
out, err := client.Status(context.Background(), &pb.Empty{})
if err != nil {
log.Fatal("client Error:", err.Error())
return
}
log.Print(out)
}
func agentIdentifier() string {
return "agentID_000000001"
}