This commit is contained in:
insanity@loafle.com 2017-08-04 12:02:50 +09:00
parent 2f9be008fd
commit fbf679b13a
5 changed files with 48 additions and 58 deletions

View File

@ -7,7 +7,7 @@ import (
pb "git.loafle.net/overflow/overflow_probe/crawler/grpc" pb "git.loafle.net/overflow/overflow_probe/crawler/grpc"
q "git.loafle.net/overflow/overflow_probe/queue" q "git.loafle.net/overflow/overflow_probe/queue"
"io/ioutil" "io/ioutil"
"log" log "github.com/cihub/seelog"
"os" "os"
"sync" "sync"
) )
@ -64,7 +64,6 @@ func (ds *DataSender) handleData(qc chan interface{}) {
collectedData := item.Value.(*pb.Output) collectedData := item.Value.(*pb.Output)
d := &messages.Data{} d := &messages.Data{}
d.Data = collectedData.Data d.Data = collectedData.Data
d.AgentId = agentIdentifier()
result = append(result, d) result = append(result, d)
} }
ds.send(result) ds.send(result)
@ -108,7 +107,7 @@ func (ds *DataSender) addFailedData(data []*messages.Data) {
failed := messages.Data{} failed := messages.Data{}
err := json.Unmarshal(bytes, &failed) err := json.Unmarshal(bytes, &failed)
if err != nil { if err != nil {
log.Println(err) log.Error(err)
} }
data = append([]*messages.Data{&failed}, data...) //prepend data = append([]*messages.Data{&failed}, data...) //prepend
} }
@ -117,7 +116,7 @@ func (ds *DataSender) addFailedData(data []*messages.Data) {
func (ds *DataSender) getFailedData() []byte { func (ds *DataSender) getFailedData() []byte {
b, err := ioutil.ReadFile(FILE_PATH) b, err := ioutil.ReadFile(FILE_PATH)
if err != nil { if err != nil {
log.Println(err) log.Error(err)
return nil return nil
} }
return b return b
@ -126,7 +125,7 @@ func (ds *DataSender) getFailedData() []byte {
func (ds *DataSender) removeFailed() { func (ds *DataSender) removeFailed() {
err := os.Remove(FILE_PATH) err := os.Remove(FILE_PATH)
if err != nil { if err != nil {
log.Println(err) log.Error(err)
} }
} }
@ -134,11 +133,11 @@ func (ds *DataSender) saveFailedData(datas []*messages.Data) {
file, err := tempFile() file, err := tempFile()
if err != nil { if err != nil {
log.Println(err) log.Error(err)
} }
defer func() { defer func() {
if err := file.Close(); err != nil { if err := file.Close(); err != nil {
log.Println(err) log.Error(err)
return return
} }
}() }()
@ -146,13 +145,13 @@ func (ds *DataSender) saveFailedData(datas []*messages.Data) {
for _, data := range datas { for _, data := range datas {
bytes, err := json.Marshal(&data) bytes, err := json.Marshal(&data)
if err != nil { if err != nil {
log.Println(err) log.Error(err)
} }
if bytes != nil { if bytes != nil {
log.Println("write : ", string(bytes)) log.Debug("write : ", string(bytes))
_, err = file.Write(bytes) _, err = file.Write(bytes)
if err != nil { if err != nil {
log.Println(err) log.Error(err)
} }
} }
} }
@ -182,6 +181,3 @@ func tempFile() (*os.File, error) {
return file, nil return file, nil
} }
func agentIdentifier() string {
return "agentID_000000001"
}

View File

@ -31,34 +31,34 @@ func TestTotal(t *testing.T) {
time.Sleep(time.Second * 100) time.Sleep(time.Second * 100)
} }
func TestSend(t *testing.T) { //func TestSend(t *testing.T) {
//
ds := &DataSender{} // ds := &DataSender{}
ds.start() // ds.start()
//
for i := 0; i < 20; i++ { // for i := 0; i < 20; i++ {
testNotify(strconv.Itoa(i)) // testNotify(strconv.Itoa(i))
} // }
//
time.Sleep(time.Second * 12) // time.Sleep(time.Second * 12)
for i := 20; i < 30; i++ { // for i := 20; i < 30; i++ {
testNotify(strconv.Itoa(i)) // testNotify(strconv.Itoa(i))
} // }
//
time.Sleep(time.Second * 100) // time.Sleep(time.Second * 100)
} //}
//
func testNotify(val string) { //func testNotify(val string) {
//
result := make(map[string]string) // result := make(map[string]string)
result["a"] = val // result["a"] = val
result["b"] = val // result["b"] = val
result["c"] = val // result["c"] = val
//
cd := &Data{ // cd := &Data{
SensorId: "insanity", // SensorId: "insanity",
Data: result, // Data: result,
} // }
//
observer.Notify(messages.QUEUE_DATA, cd) // observer.Notify(messages.QUEUE_DATA, cd)
} //}

View File

@ -8,7 +8,7 @@ import (
"git.loafle.net/overflow/overflow_probe/agent_api/messages" "git.loafle.net/overflow/overflow_probe/agent_api/messages"
q "git.loafle.net/overflow/overflow_probe/queue" q "git.loafle.net/overflow/overflow_probe/queue"
"log" log "github.com/cihub/seelog"
"reflect" "reflect"
"sync" "sync"
) )
@ -20,7 +20,6 @@ var (
const ( const (
CENTRAL_ADDR = "192.168.1.105:50052" CENTRAL_ADDR = "192.168.1.105:50052"
SENDER_ID = "OVERFLOW_EVENT_SENDER"
DEFAULT_INTERVAL = 1 DEFAULT_INTERVAL = 1
) )
@ -113,7 +112,7 @@ func (es *EventSender) checkQueue() {
for sc := range es.sc { for sc := range es.sc {
items := reflect.ValueOf(sc) items := reflect.ValueOf(sc)
if items.Kind() != reflect.Slice { if items.Kind() != reflect.Slice {
log.Println("ddddd") log.Info("ddddd")
} }
for i := 0; i < items.Len(); i++ { for i := 0; i < items.Len(); i++ {
@ -131,11 +130,10 @@ func (es *EventSender) checkQueue() {
} }
func (es *EventSender) send(data []*messages.Event) { func (es *EventSender) send(data []*messages.Event) {
log.Println("Send Started")
conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure()) conn, err := grpc.Dial(CENTRAL_ADDR, grpc.WithInsecure())
if err != nil { if err != nil {
log.Fatal("Connection Error :", err.Error()) log.Error("Connection Error :", err.Error())
return return
} }
defer conn.Close() defer conn.Close()
@ -144,12 +142,9 @@ func (es *EventSender) send(data []*messages.Event) {
out, err := client.Status(context.Background(), &pb.Empty{}) out, err := client.Status(context.Background(), &pb.Empty{})
if err != nil { if err != nil {
log.Fatal("client Error:", err.Error()) log.Error("client Error:", err.Error())
return return
} }
log.Print(out) log.Debug(out)
} }
func agentIdentifier() string {
return "agentID_000000001"
}

View File

@ -7,7 +7,7 @@ import (
"os" "os"
) )
var KEY = []byte("qwerqwerqwerqwer") var KEY = []byte("GIDISLLWGHX.EIIEUYWLIGLWI")
func Encrypt(filePath, destName string) string { func Encrypt(filePath, destName string) string {

View File

@ -2,7 +2,7 @@ package queue
import ( import (
"container/heap" "container/heap"
"log" log "github.com/cihub/seelog"
"sync" "sync"
"time" "time"
) )
@ -53,7 +53,6 @@ func (lq *LoafleQueue) getItems() {
lq.senderChanel <- resultItems lq.senderChanel <- resultItems
lq.size = lq.Len() lq.size = lq.Len()
//log.Println("result length: ", len(resultItems))
resultItems = nil resultItems = nil
} }
} }
@ -100,7 +99,7 @@ func (lq *LoafleQueue) Close() {
func NewQueue(interval time.Duration, senderChanel chan interface{}) *LoafleQueue { func NewQueue(interval time.Duration, senderChanel chan interface{}) *LoafleQueue {
items := make([]*Item, 0) items := make([]*Item, 0)
log.Println("NewQueu Start") log.Info("New queue has started.")
lq := &LoafleQueue{ lq := &LoafleQueue{
items: items, items: items,
interval: interval, interval: interval,