commit fa0f700a9bf014389177f4308ee3604802c84d09 Author: crusader Date: Fri May 11 18:23:08 2018 +0900 ing diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3733e36 --- /dev/null +++ b/.gitignore @@ -0,0 +1,68 @@ +# Created by .ignore support plugin (hsz.mobi) +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties +### Go template +# Binaries for programs and plugins +*.exe +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736 +.glide/ +.idea/ +*.iml + +vendor/ +glide.lock +.DS_Store +dist/ +debug diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..2ca2b1d --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,32 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${workspaceRoot}/main.go", + "env": {}, + "args": [], + "showLog": true + }, + { + "name": "File Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${fileDirname}", + "env": {}, + "args": [], + "showLog": true + } + + ] +} \ No newline at end of file diff --git a/consumer-context.go b/consumer-context.go new file mode 100644 index 0000000..f2ccfe5 --- /dev/null +++ b/consumer-context.go @@ -0,0 +1,19 @@ +package consumer + +import ( + cuc "git.loafle.net/commons/util-go/context" +) + +type ConsumerCtx interface { + cuc.Context +} + +func NewConsumerCtx(parent cuc.Context) ConsumerCtx { + return &consumerCtx{ + Context: cuc.NewContext(parent), + } +} + +type consumerCtx struct { + cuc.Context +} diff --git a/consumer-handler.go b/consumer-handler.go new file mode 100644 index 0000000..16a5cf3 --- /dev/null +++ b/consumer-handler.go @@ -0,0 +1,69 @@ +package consumer + +import ( + "fmt" + "strings" + "sync/atomic" +) + +type ConsumerHandler interface { + GetName() string + GetConsumerName() string + + ConsumerCtx() ConsumerCtx + + Init(consumerCtx ConsumerCtx) error + OnStart(consumerCtx ConsumerCtx) error + OnStop(consumerCtx ConsumerCtx) + Destroy(consumerCtx ConsumerCtx) + + Validate() error +} + +type ConsumerHandlers struct { + Name string `json:"name,omitempty"` + ConsumerName string `json:"consumerName,omitempty"` + + validated atomic.Value +} + +func (ch *ConsumerHandlers) ConsumerCtx() ConsumerCtx { + return NewConsumerCtx(nil) +} + +func (ch *ConsumerHandlers) Init(consumerCtx ConsumerCtx) error { + return nil +} + +func (ch *ConsumerHandlers) OnStart(consumerCtx ConsumerCtx) error { + return nil +} + +func (ch *ConsumerHandlers) OnStop(consumerCtx ConsumerCtx) { + +} + +func (ch *ConsumerHandlers) Destroy(consumerCtx ConsumerCtx) { + +} + +func (ch *ConsumerHandlers) GetName() string { + return ch.Name +} + +func (ch *ConsumerHandlers) GetConsumerName() string { + return ch.ConsumerName +} + +func (ch *ConsumerHandlers) Validate() error { + if nil != ch.validated.Load() { + return nil + } + ch.validated.Store(true) + + if "" == strings.TrimSpace(ch.ConsumerName) { + return fmt.Errorf("ConsumerName is not valid") + } + + return nil +} diff --git a/consumer.go b/consumer.go new file mode 100644 index 0000000..358dc84 --- /dev/null +++ b/consumer.go @@ -0,0 +1,146 @@ +package consumer + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "git.loafle.net/commons/logging-go" + oek "git.loafle.net/overflow/external-go/kafka" + "github.com/segmentio/kafka-go" +) + +type Consumer struct { + ConsumerHandler ConsumerHandler + + ctx ConsumerCtx + stopChan chan struct{} + stopWg sync.WaitGroup +} + +func (c *Consumer) Start() error { + if c.stopChan != nil { + return fmt.Errorf("%s already running. Stop it before starting it again", c.logHeader()) + } + + if nil == c.ConsumerHandler { + return fmt.Errorf("%s ConsumerHandler must be specified", c.logHeader()) + } + if err := c.ConsumerHandler.Validate(); nil != err { + return fmt.Errorf("%s validate error %v", c.logHeader(), err) + } + + c.ctx = c.ConsumerHandler.ConsumerCtx() + if nil == c.ctx { + return fmt.Errorf("%s ConsumerCtx is nil", c.logHeader()) + } + + reader := oek.Reader(c.ConsumerHandler.GetConsumerName()) + if nil == reader { + return fmt.Errorf("%s Reader[%s] is not valid", c.logHeader(), c.ConsumerHandler.GetConsumerName()) + } + + if err := c.ConsumerHandler.Init(c.ctx); nil != err { + return fmt.Errorf("%s Init error %v", c.logHeader(), err) + } + + c.stopChan = make(chan struct{}) + + c.stopWg.Add(1) + go c.handleConsumer(reader) + + return nil +} + +func (c *Consumer) Stop(ctx context.Context) error { + if c.stopChan == nil { + return fmt.Errorf("%s must be started before stopping it", c.logHeader()) + } + close(c.stopChan) + c.stopWg.Wait() + + c.ConsumerHandler.Destroy(c.ctx) + + c.stopChan = nil + + return nil +} + +func (c *Consumer) logHeader() string { + return fmt.Sprintf("Consumer[%s]:", c.ConsumerHandler.GetName()) +} + +func (c *Consumer) handleConsumer(reader *kafka.Reader) { + defer func() { + c.ConsumerHandler.OnStop(c.ctx) + logging.Logger().Infof("%s Stopped", c.logHeader()) + c.stopWg.Done() + }() + + if err := c.ConsumerHandler.OnStart(c.ctx); nil != err { + logging.Logger().Error(err) + return + } + + stopChan := make(chan struct{}) + receiveDoneChan := make(chan error) + + go c.handleReceive(stopChan, receiveDoneChan, reader) + + logging.Logger().Infof("%s Started", c.logHeader()) + + select { + case <-receiveDoneChan: + close(stopChan) + case <-c.stopChan: + close(stopChan) + <-receiveDoneChan + } +} + +func (c *Consumer) handleReceive(stopChan <-chan struct{}, doneChan chan<- error, reader *kafka.Reader) { + var ( + m kafka.Message + stopping atomic.Value + err error + ) + + defer func() { + doneChan <- err + }() + + for { + readChan := make(chan struct{}) + go func() { + m, err = reader.ReadMessage(context.Background()) + if err != nil { + if nil == stopping.Load() { + logging.Logger().Errorf("%s %v", c.logHeader(), err) + } + } + + close(readChan) + }() + + select { + case <-c.stopChan: + stopping.Store(true) + return + case <-readChan: + } + + if nil != err { + select { + case <-c.stopChan: + return + case <-time.After(time.Second): + } + continue + } + + fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) + } + +} diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..580126d --- /dev/null +++ b/glide.yaml @@ -0,0 +1,10 @@ +package: git.loafle.net/overflow/consumer +import: +- package: git.loafle.net/commons/logging-go +- package: git.loafle.net/commons/util-go + subpackages: + - context +- package: git.loafle.net/overflow/external-go + subpackages: + - kafka +- package: github.com/segmentio/kafka-go