gateway/external/kafka/kafka.go

64 lines
988 B
Go
Raw Normal View History

2018-04-09 11:37:54 +00:00
package kafka
import (
"context"
"fmt"
"git.loafle.net/commons/logging-go"
2018-04-26 07:58:25 +00:00
occe "git.loafle.net/overflow/commons-go/config/external"
2018-04-09 11:37:54 +00:00
"github.com/segmentio/kafka-go"
)
var (
kafkaWriter *kafka.Writer
)
2018-04-26 07:58:25 +00:00
func InitPackage(config *occe.Kafka) {
2018-04-10 05:21:48 +00:00
kafkaWriter = kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"192.168.1.50:9092"},
Topic: "overflow-metric-topic",
Balancer: &kafka.LeastBytes{},
})
2018-04-09 11:37:54 +00:00
}
2018-04-26 07:58:25 +00:00
func StartPackage(config *occe.Kafka) {
2018-04-10 05:21:48 +00:00
if nil == config {
return
}
}
2018-04-26 07:58:25 +00:00
func StopPackage(config *occe.Kafka) {
2018-04-10 05:21:48 +00:00
if nil == config {
return
}
}
2018-04-26 07:58:25 +00:00
func DestroyPackage(config *occe.Kafka) {
2018-04-10 05:21:48 +00:00
if nil == config {
return
}
if err := kafkaWriter.Close(); nil != err {
logging.Logger().Errorf("%v", err)
}
2018-04-09 11:37:54 +00:00
}
func Write(key []byte, value []byte) error {
if nil == kafkaWriter {
return fmt.Errorf("Kafka client is not valid")
}
err := kafkaWriter.WriteMessages(context.Background(),
kafka.Message{
Key: key,
Value: value,
},
)
return err
}