event sender shared
This commit is contained in:
parent
7987e27fc7
commit
407e1e5f70
92
event_sender.go
Normal file
92
event_sender.go
Normal file
|
@ -0,0 +1,92 @@
|
|||
package event_sender
|
||||
|
||||
import (
|
||||
"context"
|
||||
"google.golang.org/grpc"
|
||||
"loafle.com/overflow/agent_api/observer/messages"
|
||||
pb "loafle.com/overflow/crawler_go/grpc"
|
||||
q "loafle.com/overflow/queue_go"
|
||||
"log"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
const (
|
||||
CENTRAL_ADDR = "192.168.1.105:50052"
|
||||
SENDER_ID = "OVERFLOW_EVENT_SENDER"
|
||||
DEFAULT_INTERVAL = 1
|
||||
)
|
||||
|
||||
type Data struct {
|
||||
AgentId string
|
||||
SensorId string
|
||||
Data map[string]string
|
||||
StartedAt uint64
|
||||
FinishedAt uint64
|
||||
}
|
||||
|
||||
type EventSender struct {
|
||||
lq *q.LoafleQueue
|
||||
sc chan interface{}
|
||||
}
|
||||
|
||||
func (es *EventSender) Start() {
|
||||
es.sc = make(chan interface{}, 0)
|
||||
|
||||
es.lq = q.NewQueue(messages.QUEUE_EVENT, DEFAULT_INTERVAL, es.sc)
|
||||
|
||||
go es.checkQueue()
|
||||
}
|
||||
|
||||
func (es *EventSender) Stop() {
|
||||
if es.sc != nil {
|
||||
close(es.sc)
|
||||
}
|
||||
}
|
||||
|
||||
func (es *EventSender) checkQueue() {
|
||||
|
||||
result := make([]*Data, 0)
|
||||
|
||||
for sc := range es.sc {
|
||||
items := reflect.ValueOf(sc)
|
||||
if items.Kind() != reflect.Slice {
|
||||
log.Println("ddddd")
|
||||
}
|
||||
|
||||
for i := 0; i < items.Len(); i++ {
|
||||
item := items.Index(i).Elem().Interface()
|
||||
tempCollectedData := item.(q.Item)
|
||||
collectedData := tempCollectedData.Value.(*Data)
|
||||
collectedData.AgentId = agentIdentifier()
|
||||
result = append(result, collectedData)
|
||||
log.Println("Result Len: ", len(result))
|
||||
}
|
||||
es.send(result)
|
||||
result = nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (es *EventSender) send(data []*Data) {
|
||||
log.Println("Send Started")
|
||||
conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure())
|
||||
|
||||
if err != nil {
|
||||
log.Fatal("Connection Error :", err.Error())
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := pb.NewStatusClient(conn)
|
||||
out, err := client.Status(context.Background(), &pb.Empty{})
|
||||
|
||||
if err != nil {
|
||||
log.Fatal("client Error:", err.Error())
|
||||
return
|
||||
}
|
||||
log.Print(out)
|
||||
}
|
||||
|
||||
func agentIdentifier() string {
|
||||
return "agentID_000000001"
|
||||
}
|
36
event_sender_test.go
Normal file
36
event_sender_test.go
Normal file
|
@ -0,0 +1,36 @@
|
|||
package event_sender
|
||||
|
||||
import (
|
||||
"loafle.com/overflow/agent_api/observer"
|
||||
"loafle.com/overflow/agent_api/observer/messages"
|
||||
"log"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestEventSender_Start(t *testing.T) {
|
||||
|
||||
es := &EventSender{}
|
||||
es.Start()
|
||||
testNotify()
|
||||
|
||||
time.Sleep(time.Second * 100)
|
||||
|
||||
}
|
||||
|
||||
func testNotify() {
|
||||
//time.Sleep(time.Second * 5)
|
||||
|
||||
result := make(map[string]string)
|
||||
result["ab"] = "123"
|
||||
result["cd"] = "456"
|
||||
result["ef"] = "789"
|
||||
|
||||
cd := &Data{
|
||||
SensorId: "insanity",
|
||||
Data: result,
|
||||
}
|
||||
|
||||
log.Println("New data Notify")
|
||||
observer.Notify(messages.QUEUE_EVENT, cd)
|
||||
}
|
Loading…
Reference in New Issue
Block a user