From 1430aa098cac914950aaa3df435cd45d13734fef Mon Sep 17 00:00:00 2001 From: crusader Date: Tue, 8 May 2018 12:41:55 +0900 Subject: [PATCH] ing --- external/kafka/kafka.go | 48 +++++++++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 11 deletions(-) diff --git a/external/kafka/kafka.go b/external/kafka/kafka.go index aa7939c..73fd5d9 100644 --- a/external/kafka/kafka.go +++ b/external/kafka/kafka.go @@ -10,17 +10,36 @@ import ( ) var ( - kafkaWriter *kafka.Writer + kafkaWriters map[string]*kafka.Writer ) func InitPackage(config *occe.Kafka) { + if nil == config { + return + } + if nil == config.Producers || 0 == len(config.Producers) { + return + } - kafkaWriter = kafka.NewWriter(kafka.WriterConfig{ - Brokers: []string{"192.168.1.50:9092"}, - Topic: "overflow-metric-topic", - Balancer: &kafka.LeastBytes{}, - }) + kafkaWriters = make(map[string]*kafka.Writer) + for n, c := range config.Producers { + wc := kafka.WriterConfig{ + Brokers: c.Brokers, + Topic: c.Topic, + Balancer: &kafka.LeastBytes{}, + MaxAttempts: c.MaxAttempts, + QueueCapacity: c.QueueCapacity, + BatchSize: c.BatchSize, + BatchTimeout: c.BatchTimeout, + ReadTimeout: c.ReadTimeout, + WriteTimeout: c.WriteTimeout, + RebalanceInterval: c.RebalanceInterval, + RequiredAcks: c.RequiredAcks, + Async: c.Async, + } + kafkaWriters[n] = kafka.NewWriter(wc) + } } func StartPackage(config *occe.Kafka) { @@ -42,17 +61,24 @@ func DestroyPackage(config *occe.Kafka) { return } - if err := kafkaWriter.Close(); nil != err { - logging.Logger().Errorf("%v", err) + for _, w := range kafkaWriters { + if err := w.Close(); nil != err { + logging.Logger().Errorf("%v", err) + } } } -func Write(key []byte, value []byte) error { - if nil == kafkaWriter { +func Write(name string, key []byte, value []byte) error { + if nil == kafkaWriters { return fmt.Errorf("Kafka client is not valid") } - err := kafkaWriter.WriteMessages(context.Background(), + w, ok := kafkaWriters[name] + if !ok { + return fmt.Errorf("Kafka client is not valid") + } + + err := w.WriteMessages(context.Background(), kafka.Message{ Key: key, Value: value,