commit f9345b120c225e70b62ec82ef054fafff68b6d42 Author: crusader Date: Sun Apr 1 18:59:30 2018 +0900 ing diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3733e36 --- /dev/null +++ b/.gitignore @@ -0,0 +1,68 @@ +# Created by .ignore support plugin (hsz.mobi) +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties +### Go template +# Binaries for programs and plugins +*.exe +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736 +.glide/ +.idea/ +*.iml + +vendor/ +glide.lock +.DS_Store +dist/ +debug diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..2ca2b1d --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,32 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${workspaceRoot}/main.go", + "env": {}, + "args": [], + "showLog": true + }, + { + "name": "File Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${fileDirname}", + "env": {}, + "args": [], + "showLog": true + } + + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..46c249c --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,11 @@ +// Place your settings in this file to overwrite default and user settings. +{ + // Specifies Lint tool name. + "go.lintTool": "gometalinter", + + // Flags to pass to Lint tool (e.g. ["-min_confidence=.8"]) + "go.lintFlags": [ + "--config=${workspaceRoot}/golint.json" + ] + +} \ No newline at end of file diff --git a/consumer/consumer.go b/consumer/consumer.go new file mode 100644 index 0000000..23e9b1b --- /dev/null +++ b/consumer/consumer.go @@ -0,0 +1,70 @@ +package consumer + +import ( + "context" + "fmt" + "sync" + + "github.com/segmentio/kafka-go" +) + +type Consumer interface { + Start() error + Stop() error +} + +func New() Consumer { + return &KafkaConsumer{} +} + +type KafkaConsumer struct { + Consumer + + reader *kafka.Reader + + stopChan chan struct{} + stopWg sync.WaitGroup +} + +func (kc *KafkaConsumer) Start() error { + if kc.stopChan != nil { + return fmt.Errorf("Consumer: already running. Stop it before starting it again") + } + kc.stopChan = make(chan struct{}) + + kc.reader = kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{"192.168.1.50:9092"}, + GroupID: "overflow-metric-group-id", + Topic: "overflow-metric-topic", + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + }) + + if nil == kc.reader { + return fmt.Errorf("Consumer: cannot start kafka reader") + } + + kc.stopWg.Add(1) + go handleRun(kc) + + return nil +} + +func (kc *KafkaConsumer) Stop() error { + + return nil +} + +func handleRun(kc *KafkaConsumer) { + defer func() { + kc.stopWg.Done() + }() + + for { + m, err := kc.reader.ReadMessage(context.Background()) + if err != nil { + break + } + fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) + } +} diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..28fbfcf --- /dev/null +++ b/glide.yaml @@ -0,0 +1,3 @@ +package: git.loafle.net/overflow/probe-consumer-metric +import: +- package: github.com/segmentio/kafka-go diff --git a/main.go b/main.go new file mode 100644 index 0000000..7905807 --- /dev/null +++ b/main.go @@ -0,0 +1,5 @@ +package main + +func main() { + +}