package kafka import ( "context" "fmt" "git.loafle.net/commons/logging-go" occe "git.loafle.net/overflow/commons-go/config/external" "github.com/segmentio/kafka-go" ) var ( kafkaWriters map[string]*kafka.Writer ) func InitPackage(config *occe.Kafka) { if nil == config { return } if nil == config.Producers || 0 == len(config.Producers) { return } kafkaWriters = make(map[string]*kafka.Writer) for n, c := range config.Producers { wc := kafka.WriterConfig{ Brokers: c.Brokers, Topic: c.Topic, Balancer: &kafka.LeastBytes{}, MaxAttempts: c.MaxAttempts, QueueCapacity: c.QueueCapacity, BatchSize: c.BatchSize, BatchTimeout: c.BatchTimeout, ReadTimeout: c.ReadTimeout, WriteTimeout: c.WriteTimeout, RebalanceInterval: c.RebalanceInterval, RequiredAcks: c.RequiredAcks, Async: c.Async, } kafkaWriters[n] = kafka.NewWriter(wc) } } func StartPackage(config *occe.Kafka) { if nil == config { return } } func StopPackage(config *occe.Kafka) { if nil == config { return } } func DestroyPackage(config *occe.Kafka) { if nil == config { return } for _, w := range kafkaWriters { if err := w.Close(); nil != err { logging.Logger().Errorf("%v", err) } } } func Write(name string, key []byte, value []byte) error { if nil == kafkaWriters { return fmt.Errorf("Kafka client is not valid") } w, ok := kafkaWriters[name] if !ok { return fmt.Errorf("Kafka client is not valid") } err := w.WriteMessages(context.Background(), kafka.Message{ Key: key, Value: value, }, ) return err }