diff --git a/collector.go b/collector.go index 1af885a..9c10a2d 100644 --- a/collector.go +++ b/collector.go @@ -1,183 +1,99 @@ package collector_go import ( - "context" - "encoding/json" - "errors" - "fmt" - "io/ioutil" + confMng "loafle.com/overflow/agent_api/config_manager" s "loafle.com/overflow/collector_go/scheduler" - conf "loafle.com/overflow/crawler_go/config" - g "loafle.com/overflow/crawler_go/grpc" - crm "loafle.com/overflow/crawler_manager_go" "log" - "os" - "path/filepath" - "strings" + "sync" "time" ) -const CONFIG_ROOT = "/config/container" +var ( + instance *Collector + once sync.Once +) -type Collector struct { - scheduler s.Scheduler - //configs []*conf.Config - configs map[string]*conf.Config +func init() { + AddObservers() } -func (c *Collector) Start() { +func GetInstance() *Collector { + once.Do(func() { + instance = &Collector{} + }) + return instance +} + +func AddObservers() { + go handleConfigLoaded() +} + +type Collector struct { + scheduler s.Scheduler + cm confMng.ConfigManager + addSensorCh chan interface{} + remSensorCh chan interface{} +} + +func (c *Collector) start(conf confMng.ConfigManager) { go func() { - c.configs = make(map[string]*conf.Config, 0) - - if err := c.readAllConfig(); err != nil { - log.Println(err) - } - + c.cm = conf c.scheduler = s.Scheduler{} c.scheduler.Init() + c.addSensorCh = make(chan interface{}) + c.remSensorCh = make(chan interface{}) + handleSensorAdded(c.addSensorCh) + handleSensorRemoved(c.remSensorCh) - for _, conf := range c.configs { - if err := c.addSensor(conf); err != nil { + for _, conf := range c.cm.GetSensors() { + if err := c.addSensor(conf.Id); err != nil { log.Println(err) } } }() } -func (c *Collector) collect(id string) { - - conf := c.configs[id] - - log.Printf("COLLECT %s - [ID: %s] [Crawler : %s]", time.Now(), conf.Id, conf.Crawler.Name) - - conn, err := crm.GetInstance().GetClient(conf.Crawler.Container) - if err != nil { - log.Println(err) - } - defer conn.Close() - - dc := g.NewDataClient(conn) - in := &g.Input{} - - in.Id = id - in.Name = g.Crawlers(g.Crawlers_value[conf.Crawler.Name]) - - out, err := dc.Get(context.Background(), in) - - if err != nil { - log.Println(err) - } - log.Println(out) -} - -//func (c *Collector) AddSensor(path string) { -// config := c.readConfig(CONFIG_ROOT + path) -// if config != nil { -// if err := c.addSensor(config); err != nil { -// log.Println(err) -// } -// } -//} - -func (c *Collector) AddSensor(conf *conf.Config) { - - if c.checkExist(conf.Id) { - log.Println("The Same Id already exists.") - return - } - c.configs[conf.Id] = conf - - if conf != nil { - if err := c.addSensor(conf); err != nil { - log.Println(err) - } - } -} - -func (c *Collector) RemoveSensor(id string) { - if err := c.scheduler.RemoveSchedule(id); err != nil { - log.Println(err) - } -} - -func (c *Collector) UpdateSensor(newConf *conf.Config) { - if newConf != nil { - if !c.checkExist(newConf.Id) { - log.Println("Cannot update Sensor : ID not exist [" + newConf.Id + "]") - return - } - - exConf := c.configs[newConf.Id] - - if exConf.Schedule.Interval != newConf.Schedule.Interval { - c.scheduler.UpdateSchedule(newConf.Id, newConf.Schedule.Interval) - } - - c.configs[newConf.Id] = newConf - } -} - func (c *Collector) Stop() { + cleanObserver(c.addSensorCh, c.remSensorCh) c.scheduler.RemoveAllSchedule() c.scheduler.Stop() } -func (c *Collector) addSensor(conf *conf.Config) error { - return c.scheduler.NewSchedule(conf.Id, conf.Schedule.Interval, c.collect) -} +func (c *Collector) collect(id string) { -func (c *Collector) readAllConfig() error { + conf := c.cm.GetSensorById(id) + log.Printf("COLLECT %s - [ID: %s] [Crawler : %s]", time.Now(), conf.Id, conf.Crawler.Name) - err := filepath.Walk(CONFIG_ROOT, func(path string, f os.FileInfo, err error) error { + /* + conn, err := crm.GetInstance().GetClient(conf.Crawler.Container) if err != nil { - return err + log.Println(err) } - if !f.IsDir() && strings.HasSuffix(f.Name(), ".conf") { - c.readConfig(path) + defer conn.Close() + + dc := g.NewDataClient(conn) + in := &g.Input{} + + in.Id = id + in.Name = g.Crawlers(g.Crawlers_value[conf.Crawler.Name]) + + out, err := dc.Get(context.Background(), in) + + if err != nil { + log.Println(err) } - return nil - }) - - if err != nil { - return err - } - - if len(c.configs) <= 0 { - return errors.New("No configuration file found.") - } - - return nil + log.Println(out) + */ } -func (c *Collector) readConfig(path string) *conf.Config { - bytes, err := ioutil.ReadFile(path) - if err != nil { - fmt.Println(err) - return nil - } - conf := conf.Config{} - json.Unmarshal(bytes, &conf) - - if err := c.validateConfig(&conf, path); err != nil { - fmt.Println(err) - return nil - } - c.configs[conf.Id] = &conf - return &conf +func (c *Collector) addSensor(sensorId string) error { + sensor := c.cm.GetSensorById(sensorId) + return c.scheduler.NewSchedule(sensorId, sensor.Schedule.Interval, c.collect) } -func (c *Collector) validateConfig(conf *conf.Config, configPath string) error { - - //todo : some validations - if c.checkExist(conf.Id) { - return errors.New("The Same Id already exists. " + configPath) +func (c *Collector) removeSensor(id string) { + if err := c.scheduler.RemoveSchedule(id); err != nil { + log.Println(err) + return } - return nil -} - -func (c *Collector) checkExist(id string) bool { - if _, exists := c.configs[id]; exists { - return true - } - return false } diff --git a/collector_event.go b/collector_event.go new file mode 100644 index 0000000..6f2a0fc --- /dev/null +++ b/collector_event.go @@ -0,0 +1,47 @@ +package collector_go + +import ( + "loafle.com/overflow/agent_api/config_manager" + "loafle.com/overflow/agent_api/observer" + "loafle.com/overflow/agent_api/observer/messages" +) + +func handleConfigLoaded() { + ch := make(chan interface{}, 0) + observer.Add(messages.CONFIGMANAGER_LOADED, ch) + + data := <-ch + confMng := data.(config_manager.ConfigManager) + + coll := GetInstance() + coll.start(confMng) + + observer.Remove(messages.CONFIGMANAGER_LOADED, ch) +} + +func handleSensorAdded(ch chan interface{}) { + coll := GetInstance() + observer.Add(messages.ADD_SENSOR_2_END, ch) + + for { + configId := <-ch + coll.addSensor(configId.(string)) + } +} + +func handleSensorRemoved(ch chan interface{}) { + coll := GetInstance() + observer.Add(messages.REMOVE_SENSOR_1, ch) + + for { + configId := <-ch + coll.removeSensor(configId.(string)) + observer.Notify(messages.REMOVE_SENSOR_2_END, configId) + } +} + +func cleanObserver(ach chan interface{}, rch chan interface{}) { + observer.Remove(messages.ADD_SENSOR_2_END, ach) + observer.Remove(messages.REMOVE_SENSOR_1, rch) + observer.Notify(messages.COLLECTOR_STOPPED, true) +} diff --git a/collector_test.go b/collector_test.go index 9c7d815..3c1e684 100644 --- a/collector_test.go +++ b/collector_test.go @@ -1,58 +1,28 @@ package collector_go import ( - "encoding/json" - "fmt" - "io/ioutil" - conf "loafle.com/overflow/crawler_go/config" - "loafle.com/overflow/cron_go" + s "loafle.com/overflow/collector_go/scheduler" "log" "testing" "time" ) -func TestCallGet(t *testing.T) { - c := Collector{} - c.Start() - time.Sleep(time.Second * 10) +func TestCollector(t *testing.T) { + sc := s.Scheduler{} + sc.Init() - //log.Println("add sensor") - //c.AddSensor("/network/smb/t2.conf") - //time.Sleep(time.Second * 3) - - log.Println("update sensor") - c.UpdateSensor(newConf()) - - time.Sleep(time.Second * 30) -} - -func newConf() *conf.Config { - bytes, err := ioutil.ReadFile("/config/container/network/smb/smb2.conf") - if err != nil { - fmt.Println(err) - return nil + for i := 0; i < 9999; i++ { + sc.NewSchedule(string(i), "5", test) } - conf := conf.Config{} - json.Unmarshal(bytes, &conf) - - return &conf -} - -func TestPooling(t *testing.T) { - c := &cron.Cron{} - c.Start() - c.AddTask("polling", 3).Invoke(handlePolling) - - time.Sleep(time.Second * 10) - fmt.Println("UPDATE") - c.UpdateTask("polling", 1) + sc.NewSchedule("#######################", "5", test) time.Sleep(time.Second * 10) - fmt.Println("UPDATE") - c.UpdateTask("polling", 3) - time.Sleep(time.Second * 10) + sc.NewSchedule("#######################", "1", test) + time.Sleep(time.Second * 100) } -func handlePolling() { - fmt.Println(time.Now(), " polling") +func test(id string) { + if id == "#######################" { + log.Println(id) + } } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index fce870d..90ef53e 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -3,7 +3,6 @@ package scheduler import ( c "loafle.com/overflow/crawler_go" "loafle.com/overflow/cron_go" - "log" "strconv" "sync" ) @@ -28,6 +27,7 @@ func (s *Scheduler) Stop() { } func (s *Scheduler) NewSchedule(id, interval string, fn interface{}) error { + return s.newSchedule(id, interval, fn) } @@ -54,12 +54,3 @@ func (s *Scheduler) newSchedule(id string, interval string, fn interface{}) erro } return cron.AddTask(id, uint64(i)).Invoke(fn, id) } - -func (s *Scheduler) requestGet(id string) { - data, err := s.crawler.Get(id) - if err != nil { - log.Printf("[ID: %s] An error has occurred. %s", id, err.Error()) - return - } - log.Println(data) -}