From 1742b7fab61417bf75111b264c7ff0da2d708c01 Mon Sep 17 00:00:00 2001 From: crusader Date: Fri, 11 May 2018 17:53:55 +0900 Subject: [PATCH] ing --- kafka/kafka.go | 73 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 54 insertions(+), 19 deletions(-) diff --git a/kafka/kafka.go b/kafka/kafka.go index 73fd5d9..55426c9 100644 --- a/kafka/kafka.go +++ b/kafka/kafka.go @@ -11,34 +11,55 @@ import ( var ( kafkaWriters map[string]*kafka.Writer + kafkaReaders map[string]*kafka.Reader ) func InitPackage(config *occe.Kafka) { if nil == config { return } - if nil == config.Producers || 0 == len(config.Producers) { - return + + if nil != config.Producers || 0 < len(config.Producers) { + kafkaWriters = make(map[string]*kafka.Writer) + for n, c := range config.Producers { + wc := kafka.WriterConfig{ + Brokers: c.Brokers, + Topic: c.Topic, + Balancer: &kafka.LeastBytes{}, + MaxAttempts: c.MaxAttempts, + QueueCapacity: c.QueueCapacity, + BatchSize: c.BatchSize, + BatchTimeout: c.BatchTimeout, + ReadTimeout: c.ReadTimeout, + WriteTimeout: c.WriteTimeout, + RebalanceInterval: c.RebalanceInterval, + RequiredAcks: c.RequiredAcks, + Async: c.Async, + } + kafkaWriters[n] = kafka.NewWriter(wc) + } } - kafkaWriters = make(map[string]*kafka.Writer) - - for n, c := range config.Producers { - wc := kafka.WriterConfig{ - Brokers: c.Brokers, - Topic: c.Topic, - Balancer: &kafka.LeastBytes{}, - MaxAttempts: c.MaxAttempts, - QueueCapacity: c.QueueCapacity, - BatchSize: c.BatchSize, - BatchTimeout: c.BatchTimeout, - ReadTimeout: c.ReadTimeout, - WriteTimeout: c.WriteTimeout, - RebalanceInterval: c.RebalanceInterval, - RequiredAcks: c.RequiredAcks, - Async: c.Async, + if nil != config.Consumers || 0 < len(config.Consumers) { + kafkaReaders = make(map[string]*kafka.Reader) + for n, c := range config.Consumers { + rc := kafka.ReaderConfig{ + Brokers: c.Brokers, + GroupID: c.GroupID, + Topic: c.Topic, + QueueCapacity: c.QueueCapacity, + MinBytes: c.MaxBytes, + MaxBytes: c.MaxBytes, + MaxWait: c.MaxWait, + ReadLagInterval: c.ReadLagInterval, + HeartbeatInterval: c.HeartbeatInterval, + CommitInterval: c.CommitInterval, + SessionTimeout: c.SessionTimeout, + RebalanceTimeout: c.RebalanceTimeout, + RetentionTime: c.RetentionTime, + } + kafkaReaders[n] = kafka.NewReader(rc) } - kafkaWriters[n] = kafka.NewWriter(wc) } } @@ -68,6 +89,20 @@ func DestroyPackage(config *occe.Kafka) { } } +func Writer(name string) *kafka.Writer { + if nil == kafkaWriters { + return nil + } + return kafkaWriters[name] +} + +func Reader(name string) *kafka.Reader { + if nil == kafkaReaders { + return nil + } + return kafkaReaders[name] +} + func Write(name string, key []byte, value []byte) error { if nil == kafkaWriters { return fmt.Errorf("Kafka client is not valid")