overflow_probe/event_sender/event_sender.go
insanity@loafle.com 4b03045eb5 fix
2017-08-04 12:08:17 +09:00

150 lines
2.8 KiB
Go

package event_sender
import (
"context"
"google.golang.org/grpc"
//"git.loafle.net/overflow/overflow_probe/agent_api/observer/messages"
pb "git.loafle.net/overflow/overflow_probe/crawler/grpc"
"git.loafle.net/overflow/overflow_probe/agent_api/messages"
q "git.loafle.net/overflow/overflow_probe/queue"
log "github.com/cihub/seelog"
"reflect"
"sync"
)
var (
instance *EventSender
once sync.Once
)
const (
CENTRAL_ADDR = "192.168.1.105:50052"
DEFAULT_INTERVAL = 1
)
type Data struct {
AgentId string
SensorId string
Data map[string]string
StartedAt uint64
FinishedAt uint64
}
func GetInstance() *EventSender {
once.Do(func() {
instance = &EventSender{}
})
return instance
}
//func init() {
// ch := make(chan interface{}, 0)
// ch1 := make(chan interface{}, 0)
// observer.Add(messages.CFG_LOADED, ch)
// observer.Add(messages.SCF_STOPPED, ch1)
// handleInit(ch, ch1)
//}
//
//func handleInit(ch chan interface{}, ch1 chan interface{}) {
// es := GetInstance()
// go func() {
// data := <-ch
// log.Println("handleInit", data)
// //ds.gconf = data.(cm.ConfigManager).GetGlobalConfig()
// es.start()
// observer.Remove(messages.CFG_LOADED, ch)
// }()
//
// go func() {
// data := <-ch1
// observer.Notify(messages.QUEUE_EVENT, data)
// time.Sleep(2 * time.Second)
// es.Stop()
// }()
//}
func Start(ch chan bool) (err error) {
es := GetInstance()
es.start()
ch <- true
return nil
}
func Stop() (err error) {
es := GetInstance()
es.stop()
return nil
}
func AddEventData(event interface{}) {
es := GetInstance()
es.lq.PushItem(event)
}
type EventSender struct {
once sync.Once
lq *q.LoafleQueue
sc chan interface{}
}
func (es *EventSender) start() {
es.sc = make(chan interface{}, 0)
es.lq = q.NewQueue(DEFAULT_INTERVAL, es.sc)
go es.checkQueue()
}
func (es *EventSender) stop() {
if es.sc != nil {
es.lq.Close()
close(es.sc)
es.sc = nil
}
}
func (es *EventSender) checkQueue() {
result := make([]*messages.Event, 0)
for sc := range es.sc {
items := reflect.ValueOf(sc)
if items.Kind() != reflect.Slice {
log.Info("ddddd")
}
for i := 0; i < items.Len(); i++ {
item := items.Index(i).Elem().Interface()
tempCollectedData := item.(q.Item)
collectedData := tempCollectedData.Value.(messages.Event)
//collectedData.AgentId = agentIdentifier()
result = append(result, &collectedData)
//log.Println("Result Len: ", len(result))
}
//es.send(result)
result = nil
}
}
func (es *EventSender) send(data []*messages.Event) {
conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure())
if err != nil {
log.Error("Connection Error :", err.Error())
return
}
defer conn.Close()
client := pb.NewStatusClient(conn)
out, err := client.Status(context.Background(), &pb.Empty{})
if err != nil {
log.Error("client Error:", err.Error())
return
}
log.Debug(out)
}