From fedd1286499d154fdc33d4301ac8191eb0735c16 Mon Sep 17 00:00:00 2001 From: crusader Date: Fri, 11 May 2018 19:06:30 +0900 Subject: [PATCH] ing --- consumer-handler.go | 8 ++++++++ consumer.go | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/consumer-handler.go b/consumer-handler.go index 16a5cf3..82e55ae 100644 --- a/consumer-handler.go +++ b/consumer-handler.go @@ -4,6 +4,8 @@ import ( "fmt" "strings" "sync/atomic" + + "github.com/segmentio/kafka-go" ) type ConsumerHandler interface { @@ -17,6 +19,8 @@ type ConsumerHandler interface { OnStop(consumerCtx ConsumerCtx) Destroy(consumerCtx ConsumerCtx) + OnMessage(msg *kafka.Message) + Validate() error } @@ -47,6 +51,10 @@ func (ch *ConsumerHandlers) Destroy(consumerCtx ConsumerCtx) { } +func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) { + fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) +} + func (ch *ConsumerHandlers) GetName() string { return ch.Name } diff --git a/consumer.go b/consumer.go index c3606e5..ee291cf 100644 --- a/consumer.go +++ b/consumer.go @@ -140,7 +140,7 @@ func (c *Consumer) handleReceive(stopChan <-chan struct{}, doneChan chan<- error continue } - fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) + c.ConsumerHandler.OnMessage(&m) } }