package kafka

import (
	"context"
	"fmt"

	"git.loafle.net/commons/logging-go"
	ocec "git.loafle.net/overflow/commons-go/external/config"
	"github.com/segmentio/kafka-go"
)

var (
	kafkaWriter *kafka.Writer
)

func InitPackage(config *ocec.Kafka) {

	kafkaWriter = kafka.NewWriter(kafka.WriterConfig{
		Brokers:  []string{"192.168.1.50:9092"},
		Topic:    "overflow-metric-topic",
		Balancer: &kafka.LeastBytes{},
	})

}

func StartPackage(config *ocec.Kafka) {
	if nil == config {
		return
	}

}

func StopPackage(config *ocec.Kafka) {
	if nil == config {
		return
	}

}

func DestroyPackage(config *ocec.Kafka) {
	if nil == config {
		return
	}

	if err := kafkaWriter.Close(); nil != err {
		logging.Logger().Errorf("%v", err)
	}
}

func Write(key []byte, value []byte) error {
	if nil == kafkaWriter {
		return fmt.Errorf("Kafka client is not valid")
	}

	err := kafkaWriter.WriteMessages(context.Background(),
		kafka.Message{
			Key:   key,
			Value: value,
		},
	)

	return err
}