collector
This commit is contained in:
		
							parent
							
								
									ea18aeab18
								
							
						
					
					
						commit
						58c6c696ad
					
				
							
								
								
									
										87
									
								
								collector.go
									
									
									
									
									
								
							
							
						
						
									
										87
									
								
								collector.go
									
									
									
									
									
								
							| @ -2,7 +2,7 @@ package collector_go | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	confMng "loafle.com/overflow/agent_api/config_manager" | 	cm "loafle.com/overflow/agent_api/config_manager" | ||||||
| 	"loafle.com/overflow/crawler_go/grpc" | 	"loafle.com/overflow/crawler_go/grpc" | ||||||
| 	crm "loafle.com/overflow/crawler_manager_go" | 	crm "loafle.com/overflow/crawler_manager_go" | ||||||
| 	s "loafle.com/overflow/scheduler_go" | 	s "loafle.com/overflow/scheduler_go" | ||||||
| @ -17,8 +17,22 @@ var ( | |||||||
| 	once     sync.Once | 	once     sync.Once | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func init() { | type Collector struct { | ||||||
| 	go handleConfigLoaded() | 	scheduler *s.Scheduler | ||||||
|  | 	cm        cm.ConfigManager | ||||||
|  | 	dataCh chan interface{} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func Start(started chan bool, dataCh chan interface{}, conf cm.ConfigManager) { | ||||||
|  | 	c := GetInstance() | ||||||
|  | 	c.dataCh = dataCh | ||||||
|  | 	c.start(started, conf) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func Stop(stopped chan bool) { | ||||||
|  | 	c := GetInstance() | ||||||
|  | 	c.stop() | ||||||
|  | 	stopped <- true | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func GetInstance() *Collector { | func GetInstance() *Collector { | ||||||
| @ -28,63 +42,27 @@ func GetInstance() *Collector { | |||||||
| 	return instance | 	return instance | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type Collector struct { |  | ||||||
| 	scheduler *s.Scheduler |  | ||||||
| 	cm        confMng.ConfigManager |  | ||||||
| 
 | 
 | ||||||
| 	startSensorCh     chan interface{} |  | ||||||
| 	stopSensorCh      chan interface{} |  | ||||||
| 	addSensorCh       chan interface{} |  | ||||||
| 	remSensorCh       chan interface{} |  | ||||||
| 	updateSensorCh    chan interface{} |  | ||||||
| 	crmSensorUpdateCh chan interface{} |  | ||||||
| 	crmUpdateCh       chan interface{} |  | ||||||
| 	crmUpdateDoneCh   chan interface{} |  | ||||||
| } |  | ||||||
| 
 | 
 | ||||||
| func (c *Collector) start(conf confMng.ConfigManager) { | func (c *Collector) start(started chan bool, conf cm.ConfigManager) { | ||||||
| 	go func() { | 	go func() { | ||||||
| 		c.cm = conf | 		c.cm = conf | ||||||
| 		c.scheduler = &s.Scheduler{} | 		c.scheduler = &s.Scheduler{} | ||||||
| 		c.scheduler.Start() | 		c.scheduler.Start() | ||||||
| 		c.addObservers() |  | ||||||
| 		c.notifyCollectorReady() |  | ||||||
| 
 | 
 | ||||||
| 		for _, conf := range c.cm.GetSensors() { | 		for _, conf := range c.cm.GetSensors() { | ||||||
| 			if err := c.addSensor(conf.Id); err != nil { | 			if err := c.addSensor(conf.Id); err != nil { | ||||||
| 				log.Println(err) | 				log.Println(err) | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  | 		started <- true | ||||||
| 	}() | 	}() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Collector) addObservers() { |  | ||||||
| 
 | 
 | ||||||
| 	c.startSensorCh = make(chan interface{}) | func (c *Collector) stop() { | ||||||
| 	c.stopSensorCh = make(chan interface{}) |  | ||||||
| 	c.addSensorCh = make(chan interface{}) |  | ||||||
| 	c.remSensorCh = make(chan interface{}) |  | ||||||
| 	c.updateSensorCh = make(chan interface{}) |  | ||||||
| 	c.crmSensorUpdateCh = make(chan interface{}) |  | ||||||
| 	c.crmUpdateCh = make(chan interface{}) |  | ||||||
| 	c.crmUpdateDoneCh = make(chan interface{}) |  | ||||||
| 
 |  | ||||||
| 	go c.handleAgentWillStop() |  | ||||||
| 	go c.handleSensorStart() |  | ||||||
| 	go c.handleSensorStop() |  | ||||||
| 	go c.handleSensorAdd() |  | ||||||
| 	go c.handleSensorRemove() |  | ||||||
| 	go c.handleSensorUpdate() |  | ||||||
| 	go c.handleCrmSensorUpdateDone() |  | ||||||
| 	go c.handleCrawlerUpdate() |  | ||||||
| 	go c.handleCrawlerUpdateDone() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Collector) Stop(d interface{}) { |  | ||||||
| 	c.cleanObserver(c.addSensorCh, c.remSensorCh) |  | ||||||
| 	c.scheduler.RemoveAllSchedule() | 	c.scheduler.RemoveAllSchedule() | ||||||
| 	c.scheduler.Stop() | 	c.scheduler.Stop() | ||||||
| 	c.notifyCollectorStopped() |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Collector) collect(id string) { | func (c *Collector) collect(id string) { | ||||||
| @ -108,9 +86,9 @@ func (c *Collector) collect(id string) { | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Println(err) | 		log.Println(err) | ||||||
| 	} | 	} | ||||||
| 	log.Println(out) | 	log.Println("collector get result : ", out) | ||||||
|  | 	c.dataCh <- out | ||||||
| 
 | 
 | ||||||
| 	c.notifyData(out) |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Collector) addSensor(sensorId string) error { | func (c *Collector) addSensor(sensorId string) error { | ||||||
| @ -122,12 +100,29 @@ func (c *Collector) addSensor(sensorId string) error { | |||||||
| 	return c.scheduler.NewSchedule(sensorId, uint64(interval), c.collect) | 	return c.scheduler.NewSchedule(sensorId, uint64(interval), c.collect) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Collector) removeSensor(id string) { | func (c *Collector) removeSensor(id string) error { | ||||||
| 	if err := c.scheduler.RemoveSchedule(id); err != nil { | 	if err := c.scheduler.RemoveSchedule(id); err != nil { | ||||||
| 		log.Println(err) | 		return err | ||||||
| 	} | 	} | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (c *Collector) updateSensor(id string) { | func (c *Collector) updateSensor(id string) { | ||||||
| 	//update | 	//update | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func AddSensor(id string) error { | ||||||
|  | 	return GetInstance().addSensor(id) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func RemSensor(id string) error { | ||||||
|  | 	return GetInstance().removeSensor(id) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func StartSensor(id string) error { | ||||||
|  | 	return GetInstance().scheduler.StartSchedule(id) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func StopSensor(id string) error { | ||||||
|  | 	return GetInstance().scheduler.StopSchedule(id) | ||||||
|  | } | ||||||
| @ -1,108 +0,0 @@ | |||||||
| package collector_go |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"loafle.com/overflow/agent_api/config_manager" |  | ||||||
| 	"loafle.com/overflow/agent_api/observer" |  | ||||||
| 	"loafle.com/overflow/agent_api/observer/messages" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| func handleConfigLoaded() { |  | ||||||
| 	ch := make(chan interface{}, 0) |  | ||||||
| 	observer.Add(messages.CRM_READY, ch) |  | ||||||
| 
 |  | ||||||
| 	data := <-ch |  | ||||||
| 	confMng := data.(config_manager.ConfigManager) |  | ||||||
| 
 |  | ||||||
| 	coll := GetInstance() |  | ||||||
| 	coll.start(confMng) |  | ||||||
| 	//observer.Remove(messages.CRM_READY, ch) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Collector) notifyCollectorReady() { |  | ||||||
| 	observer.Notify(messages.CLT_READY, nil) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Collector) notifyCollectorStopped() { |  | ||||||
| 	observer.Notify(messages.CLT_STOPPED, nil) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Collector) handleAgentWillStop() { |  | ||||||
| 	ch := make(chan interface{}, 0) |  | ||||||
| 	observer.Add(messages.AGT_WILL_STOP, ch) |  | ||||||
| 	data := <-ch |  | ||||||
| 	c.Stop(data) |  | ||||||
| 	//observer.Remove(messages.AGT_WILL_STOP, ch) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Collector) handleSensorStart() { |  | ||||||
| 	observer.Add(messages.TASK_SENSOR_START, c.startSensorCh) |  | ||||||
| 	configId := <-c.startSensorCh |  | ||||||
| 	err := c.addSensor(configId.(string)) |  | ||||||
| 	if err == nil { |  | ||||||
| 		//todo task done |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Collector) handleSensorStop() { |  | ||||||
| 	observer.Add(messages.TASK_SENSOR_STOP, c.stopSensorCh) |  | ||||||
| 	configId := <-c.stopSensorCh |  | ||||||
| 	c.removeSensor(configId.(string)) |  | ||||||
| 	//todo task done |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Collector) handleSensorAdd() { |  | ||||||
| 	observer.Add(messages.CRM_SENSOR_ADD_DONE, c.addSensorCh) |  | ||||||
| 	configId := <-c.addSensorCh |  | ||||||
| 	err := c.addSensor(configId.(string)) |  | ||||||
| 	if err == nil { |  | ||||||
| 		//todo task done |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Collector) handleSensorRemove() { |  | ||||||
| 	observer.Add(messages.TASK_SENSOR_REMOVE, c.remSensorCh) |  | ||||||
| 	configId := <-c.remSensorCh |  | ||||||
| 	c.removeSensor(configId.(string)) |  | ||||||
| 	observer.Notify(messages.CLT_SENSOR_REMOVE_DONE, configId) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Collector) handleSensorUpdate() { |  | ||||||
| 	observer.Add(messages.TASK_SENSOR_UPDATE, c.updateSensorCh) |  | ||||||
| 
 |  | ||||||
| 	configId := <-c.updateSensorCh |  | ||||||
| 	//todo update logic |  | ||||||
| 	observer.Notify(messages.CLT_SENSOR_UPDATE_DONE, configId) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Collector) handleCrmSensorUpdateDone() { |  | ||||||
| 	observer.Add(messages.CRM_SENSOR_UPDATE_DONE, c.crmSensorUpdateCh) |  | ||||||
| 
 |  | ||||||
| 	//configId := <-c.crmSensorUpdateCh |  | ||||||
| 	//todo update logic |  | ||||||
| 	//todo task done |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Collector) handleCrawlerUpdate() { |  | ||||||
| 	observer.Add(messages.TASK_CRAWLER_UPDATE, c.crmUpdateCh) |  | ||||||
| 	//todo. do what collector has to do |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Collector) handleCrawlerUpdateDone() { |  | ||||||
| 	observer.Add(messages.CRM_UPDATE_DONE, c.crmUpdateDoneCh) |  | ||||||
| 	//todo. task done |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Collector) cleanObserver(ach chan interface{}, rch chan interface{}) { |  | ||||||
| 	observer.Remove(messages.TASK_SENSOR_START, c.startSensorCh) |  | ||||||
| 	observer.Remove(messages.TASK_SENSOR_STOP, c.stopSensorCh) |  | ||||||
| 	observer.Remove(messages.CRM_SENSOR_ADD_DONE, c.addSensorCh) |  | ||||||
| 	observer.Remove(messages.TASK_SENSOR_REMOVE, c.remSensorCh) |  | ||||||
| 	observer.Remove(messages.TASK_SENSOR_UPDATE, c.updateSensorCh) |  | ||||||
| 	observer.Remove(messages.TASK_CRAWLER_UPDATE, c.crmUpdateCh) |  | ||||||
| 	observer.Remove(messages.CRM_SENSOR_UPDATE_DONE, c.crmSensorUpdateCh) |  | ||||||
| 	observer.Remove(messages.CRM_UPDATE_DONE, c.crmUpdateDoneCh) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (c *Collector) notifyData(data interface{}) { |  | ||||||
| 	observer.Notify(messages.QUEUE_DATA, data) |  | ||||||
| } |  | ||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user