commit 3837faafb2ca7ee1fa8d5592b684c41b6ee0b913 Author: crusader Date: Fri May 11 19:07:17 2018 +0900 ing diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3733e36 --- /dev/null +++ b/.gitignore @@ -0,0 +1,68 @@ +# Created by .ignore support plugin (hsz.mobi) +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties +### Go template +# Binaries for programs and plugins +*.exe +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736 +.glide/ +.idea/ +*.iml + +vendor/ +glide.lock +.DS_Store +dist/ +debug diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..2ca2b1d --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,32 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${workspaceRoot}/main.go", + "env": {}, + "args": [], + "showLog": true + }, + { + "name": "File Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${fileDirname}", + "env": {}, + "args": [], + "showLog": true + } + + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..46c249c --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,11 @@ +// Place your settings in this file to overwrite default and user settings. +{ + // Specifies Lint tool name. + "go.lintTool": "gometalinter", + + // Flags to pass to Lint tool (e.g. ["-min_confidence=.8"]) + "go.lintFlags": [ + "--config=${workspaceRoot}/golint.json" + ] + +} \ No newline at end of file diff --git a/config.json b/config.json new file mode 100644 index 0000000..7a2088b --- /dev/null +++ b/config.json @@ -0,0 +1,21 @@ +{ + "external": { + "grpc": { + "network": "tcp4", + "address": "192.168.1.50:50006" + }, + "kafka": { + "consumers": { + "metric": { + "brokers": [ + "192.168.1.50:9092" + ], + "groupID": "overflow-metric-group-id", + "topic": "overflow-metric-topic", + "minBytes": 10000, + "maxBytes": 1000000 + } + } + } + } +} diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..f9d48ef --- /dev/null +++ b/config/config.go @@ -0,0 +1,9 @@ +package config + +import ( + occe "git.loafle.net/overflow/commons-go/config/external" +) + +type Config struct { + External *occe.External `json:"external"` +} diff --git a/consumer/consumer-handler.go b/consumer/consumer-handler.go new file mode 100644 index 0000000..d2e4bb9 --- /dev/null +++ b/consumer/consumer-handler.go @@ -0,0 +1,69 @@ +package consumer + +import ( + "sync/atomic" + + oc "git.loafle.net/overflow/consumer" + "git.loafle.net/overflow/consumer_metric/config" + oe "git.loafle.net/overflow/external-go" + "github.com/segmentio/kafka-go" +) + +type ConsumerHandler interface { + oc.ConsumerHandler +} + +type ConsumerHandlers struct { + oc.ConsumerHandlers + + Config *config.Config + + validated atomic.Value +} + +func (ch *ConsumerHandlers) Init(consumerCtx oc.ConsumerCtx) error { + if err := ch.ConsumerHandlers.Init(consumerCtx); nil != err { + return err + } + oe.InitPackage(ch.Config.External) + + return nil +} + +func (ch *ConsumerHandlers) OnStart(consumerCtx oc.ConsumerCtx) error { + if err := ch.ConsumerHandlers.OnStart(consumerCtx); nil != err { + return err + } + oe.StartPackage(ch.Config.External) + + return nil +} + +func (ch *ConsumerHandlers) OnStop(consumerCtx oc.ConsumerCtx) { + oe.StopPackage(ch.Config.External) + + ch.ConsumerHandlers.OnStop(consumerCtx) +} + +func (ch *ConsumerHandlers) Destroy(consumerCtx oc.ConsumerCtx) { + oe.DestroyPackage(ch.Config.External) + + ch.ConsumerHandlers.Destroy(consumerCtx) +} + +func (ch *ConsumerHandlers) OnMessage(msg *kafka.Message) { + +} + +func (ch *ConsumerHandlers) Validate() error { + if err := ch.ConsumerHandlers.Validate(); nil != err { + return err + } + + if nil != ch.validated.Load() { + return nil + } + ch.validated.Store(true) + + return nil +} diff --git a/consumer/consumer.go b/consumer/consumer.go new file mode 100644 index 0000000..6d26190 --- /dev/null +++ b/consumer/consumer.go @@ -0,0 +1,20 @@ +package consumer + +import ( + oc "git.loafle.net/overflow/consumer" + "git.loafle.net/overflow/consumer_metric/config" +) + +func New(_config *config.Config) *oc.Consumer { + ch := &ConsumerHandlers{ + Config: _config, + } + ch.Name = "Metric" + ch.ConsumerName = "metric" + + c := &oc.Consumer{ + ConsumerHandler: ch, + } + + return c +} diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..5ab7ed7 --- /dev/null +++ b/glide.yaml @@ -0,0 +1,10 @@ +package: git.loafle.net/overflow/consumer_metric +import: +- package: git.loafle.net/commons/configuration-go +- package: git.loafle.net/commons/logging-go +- package: git.loafle.net/overflow/commons-go + subpackages: + - config/external +- package: github.com/segmentio/kafka-go +- package: git.loafle.net/overflow/consumer +- package: git.loafle.net/overflow/external-go diff --git a/main.go b/main.go new file mode 100644 index 0000000..b1ae948 --- /dev/null +++ b/main.go @@ -0,0 +1,62 @@ +package main + +import ( + "context" + "flag" + "log" + "os" + "os/signal" + "syscall" + "time" + + "git.loafle.net/commons/configuration-go" + "git.loafle.net/commons/logging-go" + "git.loafle.net/overflow/consumer_metric/config" + "git.loafle.net/overflow/consumer_metric/consumer" +) + +var ( + configDir *string +) + +func init() { + configDir = flag.String("config-dir", "./", "Config directory") + logConfigPath := flag.String("log-config", "", "logging config path") + flag.Parse() + + logging.InitializeLogger(*logConfigPath) +} + +func main() { + defer logging.Logger().Sync() + + _config := &config.Config{} + configuration.SetConfigPath(*configDir) + if err := configuration.Load(_config, "config.json"); nil != err { + logging.Logger().Panic(err) + } + + c := consumer.New(_config) + + err := c.Start() + if nil != err { + log.Printf("err: %v", err) + os.Exit(1) + } + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, + syscall.SIGKILL, + syscall.SIGSTOP, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + <-interrupt + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := c.Stop(ctx); err != nil { + logging.Logger().Errorf("error: %v", err) + } +}