event sender shared
This commit is contained in:
		
							parent
							
								
									100f14cbdb
								
							
						
					
					
						commit
						d5815eb71a
					
				@ -0,0 +1,92 @@
 | 
				
			|||||||
 | 
					package event_sender
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"google.golang.org/grpc"
 | 
				
			||||||
 | 
						"loafle.com/overflow/agent_api/observer/messages"
 | 
				
			||||||
 | 
						pb "loafle.com/overflow/crawler_go/grpc"
 | 
				
			||||||
 | 
						q "loafle.com/overflow/queue_go"
 | 
				
			||||||
 | 
						"log"
 | 
				
			||||||
 | 
						"reflect"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						CENTRAL_ADDR     = "192.168.1.105:50052"
 | 
				
			||||||
 | 
						SENDER_ID        = "OVERFLOW_EVENT_SENDER"
 | 
				
			||||||
 | 
						DEFAULT_INTERVAL = 1
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type Data struct {
 | 
				
			||||||
 | 
						AgentId    string
 | 
				
			||||||
 | 
						SensorId   string
 | 
				
			||||||
 | 
						Data       map[string]string
 | 
				
			||||||
 | 
						StartedAt  uint64
 | 
				
			||||||
 | 
						FinishedAt uint64
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type EventSender struct {
 | 
				
			||||||
 | 
						lq *q.LoafleQueue
 | 
				
			||||||
 | 
						sc chan interface{}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (es *EventSender) Start() {
 | 
				
			||||||
 | 
						es.sc = make(chan interface{}, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						es.lq = q.NewQueue(messages.QUEUE_EVENT, DEFAULT_INTERVAL, es.sc)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						go es.checkQueue()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (es *EventSender) Stop() {
 | 
				
			||||||
 | 
						if es.sc != nil {
 | 
				
			||||||
 | 
							close(es.sc)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (es *EventSender) checkQueue() {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						result := make([]*Data, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for sc := range es.sc {
 | 
				
			||||||
 | 
							items := reflect.ValueOf(sc)
 | 
				
			||||||
 | 
							if items.Kind() != reflect.Slice {
 | 
				
			||||||
 | 
								log.Println("ddddd")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							for i := 0; i < items.Len(); i++ {
 | 
				
			||||||
 | 
								item := items.Index(i).Elem().Interface()
 | 
				
			||||||
 | 
								tempCollectedData := item.(q.Item)
 | 
				
			||||||
 | 
								collectedData := tempCollectedData.Value.(*Data)
 | 
				
			||||||
 | 
								collectedData.AgentId = agentIdentifier()
 | 
				
			||||||
 | 
								result = append(result, collectedData)
 | 
				
			||||||
 | 
								log.Println("Result Len: ", len(result))
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							es.send(result)
 | 
				
			||||||
 | 
							result = nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (es *EventSender) send(data []*Data) {
 | 
				
			||||||
 | 
						log.Println("Send Started")
 | 
				
			||||||
 | 
						conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							log.Fatal("Connection Error :", err.Error())
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						defer conn.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						client := pb.NewStatusClient(conn)
 | 
				
			||||||
 | 
						out, err := client.Status(context.Background(), &pb.Empty{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							log.Fatal("client Error:", err.Error())
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						log.Print(out)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func agentIdentifier() string {
 | 
				
			||||||
 | 
						return "agentID_000000001"
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user