From 4c55033eaedd4430985dce7b4b40383b2f592605 Mon Sep 17 00:00:00 2001 From: crusader Date: Fri, 11 May 2018 18:56:33 +0900 Subject: [PATCH] ing --- kafka/kafka.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/kafka/kafka.go b/kafka/kafka.go index 55426c9..92453d7 100644 --- a/kafka/kafka.go +++ b/kafka/kafka.go @@ -19,7 +19,7 @@ func InitPackage(config *occe.Kafka) { return } - if nil != config.Producers || 0 < len(config.Producers) { + if nil != config.Producers && 0 < len(config.Producers) { kafkaWriters = make(map[string]*kafka.Writer) for n, c := range config.Producers { wc := kafka.WriterConfig{ @@ -40,7 +40,7 @@ func InitPackage(config *occe.Kafka) { } } - if nil != config.Consumers || 0 < len(config.Consumers) { + if nil != config.Consumers && 0 < len(config.Consumers) { kafkaReaders = make(map[string]*kafka.Reader) for n, c := range config.Consumers { rc := kafka.ReaderConfig{ @@ -87,6 +87,12 @@ func DestroyPackage(config *occe.Kafka) { logging.Logger().Errorf("%v", err) } } + + for _, r := range kafkaReaders { + if err := r.Close(); nil != err { + logging.Logger().Errorf("%v", err) + } + } } func Writer(name string) *kafka.Writer {