diff --git a/kafka/kafka.go b/kafka/kafka.go index 55426c9..92453d7 100644 --- a/kafka/kafka.go +++ b/kafka/kafka.go @@ -19,7 +19,7 @@ func InitPackage(config *occe.Kafka) { return } - if nil != config.Producers || 0 < len(config.Producers) { + if nil != config.Producers && 0 < len(config.Producers) { kafkaWriters = make(map[string]*kafka.Writer) for n, c := range config.Producers { wc := kafka.WriterConfig{ @@ -40,7 +40,7 @@ func InitPackage(config *occe.Kafka) { } } - if nil != config.Consumers || 0 < len(config.Consumers) { + if nil != config.Consumers && 0 < len(config.Consumers) { kafkaReaders = make(map[string]*kafka.Reader) for n, c := range config.Consumers { rc := kafka.ReaderConfig{ @@ -87,6 +87,12 @@ func DestroyPackage(config *occe.Kafka) { logging.Logger().Errorf("%v", err) } } + + for _, r := range kafkaReaders { + if err := r.Close(); nil != err { + logging.Logger().Errorf("%v", err) + } + } } func Writer(name string) *kafka.Writer {