package kafka import ( "context" "fmt" "git.loafle.net/commons/logging-go" occe "git.loafle.net/overflow/commons-go/config/external" "github.com/segmentio/kafka-go" ) var ( kafkaWriters map[string]*kafka.Writer kafkaReaders map[string]*kafka.Reader ) func InitPackage(config *occe.Kafka) { if nil == config { return } if nil != config.Producers || 0 < len(config.Producers) { kafkaWriters = make(map[string]*kafka.Writer) for n, c := range config.Producers { wc := kafka.WriterConfig{ Brokers: c.Brokers, Topic: c.Topic, Balancer: &kafka.LeastBytes{}, MaxAttempts: c.MaxAttempts, QueueCapacity: c.QueueCapacity, BatchSize: c.BatchSize, BatchTimeout: c.BatchTimeout, ReadTimeout: c.ReadTimeout, WriteTimeout: c.WriteTimeout, RebalanceInterval: c.RebalanceInterval, RequiredAcks: c.RequiredAcks, Async: c.Async, } kafkaWriters[n] = kafka.NewWriter(wc) } } if nil != config.Consumers || 0 < len(config.Consumers) { kafkaReaders = make(map[string]*kafka.Reader) for n, c := range config.Consumers { rc := kafka.ReaderConfig{ Brokers: c.Brokers, GroupID: c.GroupID, Topic: c.Topic, QueueCapacity: c.QueueCapacity, MinBytes: c.MaxBytes, MaxBytes: c.MaxBytes, MaxWait: c.MaxWait, ReadLagInterval: c.ReadLagInterval, HeartbeatInterval: c.HeartbeatInterval, CommitInterval: c.CommitInterval, SessionTimeout: c.SessionTimeout, RebalanceTimeout: c.RebalanceTimeout, RetentionTime: c.RetentionTime, } kafkaReaders[n] = kafka.NewReader(rc) } } } func StartPackage(config *occe.Kafka) { if nil == config { return } } func StopPackage(config *occe.Kafka) { if nil == config { return } } func DestroyPackage(config *occe.Kafka) { if nil == config { return } for _, w := range kafkaWriters { if err := w.Close(); nil != err { logging.Logger().Errorf("%v", err) } } } func Writer(name string) *kafka.Writer { if nil == kafkaWriters { return nil } return kafkaWriters[name] } func Reader(name string) *kafka.Reader { if nil == kafkaReaders { return nil } return kafkaReaders[name] } func Write(name string, key []byte, value []byte) error { if nil == kafkaWriters { return fmt.Errorf("Kafka client is not valid") } w, ok := kafkaWriters[name] if !ok { return fmt.Errorf("Kafka client is not valid") } err := w.WriteMessages(context.Background(), kafka.Message{ Key: key, Value: value, }, ) return err }