package kafka import ( "context" "fmt" "git.loafle.net/commons/logging-go" ocec "git.loafle.net/overflow/commons-go/external/config" "github.com/segmentio/kafka-go" ) var ( kafkaWriter *kafka.Writer ) func InitPackage(config *ocec.Kafka) { kafkaWriter = kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{"192.168.1.50:9092"}, Topic: "overflow-metric-topic", Balancer: &kafka.LeastBytes{}, }) } func StartPackage(config *ocec.Kafka) { if nil == config { return } } func StopPackage(config *ocec.Kafka) { if nil == config { return } } func DestroyPackage(config *ocec.Kafka) { if nil == config { return } if err := kafkaWriter.Close(); nil != err { logging.Logger().Errorf("%v", err) } } func Write(key []byte, value []byte) error { if nil == kafkaWriter { return fmt.Errorf("Kafka client is not valid") } err := kafkaWriter.WriteMessages(context.Background(), kafka.Message{ Key: key, Value: value, }, ) return err }