package kafka import ( "context" "fmt" "git.loafle.net/commons/logging-go" occe "git.loafle.net/overflow/commons-go/config/external" "github.com/segmentio/kafka-go" ) var ( writers map[string]*kafka.Writer readers map[string]*kafka.Reader ) func InitPackage(config *occe.Kafka) { if nil == config { return } if nil != config.Producers && 0 < len(config.Producers) { writers = make(map[string]*kafka.Writer) for n, c := range config.Producers { wc := kafka.WriterConfig{ Brokers: c.Brokers, Topic: c.Topic, Balancer: &kafka.LeastBytes{}, MaxAttempts: c.MaxAttempts, QueueCapacity: c.QueueCapacity, BatchSize: c.BatchSize, BatchTimeout: c.BatchTimeout, ReadTimeout: c.ReadTimeout, WriteTimeout: c.WriteTimeout, RebalanceInterval: c.RebalanceInterval, RequiredAcks: c.RequiredAcks, Async: c.Async, } writers[n] = kafka.NewWriter(wc) } } if nil != config.Consumers && 0 < len(config.Consumers) { readers = make(map[string]*kafka.Reader) for n, c := range config.Consumers { rc := kafka.ReaderConfig{ Brokers: c.Brokers, GroupID: c.GroupID, Topic: c.Topic, QueueCapacity: c.QueueCapacity, MinBytes: c.MaxBytes, MaxBytes: c.MaxBytes, MaxWait: c.MaxWait, ReadLagInterval: c.ReadLagInterval, HeartbeatInterval: c.HeartbeatInterval, CommitInterval: c.CommitInterval, SessionTimeout: c.SessionTimeout, RebalanceTimeout: c.RebalanceTimeout, RetentionTime: c.RetentionTime, } readers[n] = kafka.NewReader(rc) } } } func StartPackage(config *occe.Kafka) { if nil == config { return } } func StopPackage(config *occe.Kafka) { if nil == config { return } } func DestroyPackage(config *occe.Kafka) { if nil == config { return } for _, w := range writers { if err := w.Close(); nil != err { logging.Logger().Errorf("%v", err) } } for _, r := range readers { if err := r.Close(); nil != err { logging.Logger().Errorf("%v", err) } } } func Writer(name string) *kafka.Writer { if nil == writers { return nil } return writers[name] } func Reader(name string) *kafka.Reader { if nil == readers { return nil } return readers[name] } func Write(name string, key []byte, value []byte) error { if nil == writers { return fmt.Errorf("Kafka client is not valid") } w, ok := writers[name] if !ok { return fmt.Errorf("Kafka client is not valid") } err := w.WriteMessages(context.Background(), kafka.Message{ Key: key, Value: value, }, ) return err }