ing
This commit is contained in:
		
							parent
							
								
									1742b7fab6
								
							
						
					
					
						commit
						4c55033eae
					
				@ -19,7 +19,7 @@ func InitPackage(config *occe.Kafka) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if nil != config.Producers || 0 < len(config.Producers) {
 | 
			
		||||
	if nil != config.Producers && 0 < len(config.Producers) {
 | 
			
		||||
		kafkaWriters = make(map[string]*kafka.Writer)
 | 
			
		||||
		for n, c := range config.Producers {
 | 
			
		||||
			wc := kafka.WriterConfig{
 | 
			
		||||
@ -40,7 +40,7 @@ func InitPackage(config *occe.Kafka) {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if nil != config.Consumers || 0 < len(config.Consumers) {
 | 
			
		||||
	if nil != config.Consumers && 0 < len(config.Consumers) {
 | 
			
		||||
		kafkaReaders = make(map[string]*kafka.Reader)
 | 
			
		||||
		for n, c := range config.Consumers {
 | 
			
		||||
			rc := kafka.ReaderConfig{
 | 
			
		||||
@ -87,6 +87,12 @@ func DestroyPackage(config *occe.Kafka) {
 | 
			
		||||
			logging.Logger().Errorf("%v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, r := range kafkaReaders {
 | 
			
		||||
		if err := r.Close(); nil != err {
 | 
			
		||||
			logging.Logger().Errorf("%v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Writer(name string) *kafka.Writer {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user