probe-consumer-metric/consumer/consumer.go

71 lines
1.2 KiB
Go
Raw Normal View History

2018-04-01 09:59:30 +00:00
package consumer
import (
"context"
"fmt"
"sync"
"github.com/segmentio/kafka-go"
)
type Consumer interface {
Start() error
Stop() error
}
func New() Consumer {
return &KafkaConsumer{}
}
type KafkaConsumer struct {
Consumer
reader *kafka.Reader
stopChan chan struct{}
stopWg sync.WaitGroup
}
func (kc *KafkaConsumer) Start() error {
if kc.stopChan != nil {
return fmt.Errorf("Consumer: already running. Stop it before starting it again")
}
kc.stopChan = make(chan struct{})
kc.reader = kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"192.168.1.50:9092"},
GroupID: "overflow-metric-group-id",
Topic: "overflow-metric-topic",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
if nil == kc.reader {
return fmt.Errorf("Consumer: cannot start kafka reader")
}
kc.stopWg.Add(1)
go handleRun(kc)
return nil
}
func (kc *KafkaConsumer) Stop() error {
return nil
}
func handleRun(kc *KafkaConsumer) {
defer func() {
kc.stopWg.Done()
}()
for {
m, err := kc.reader.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
}