commit d8712413756f24a1945ddfee06f1cf37ec286b74 Author: crusader Date: Fri May 11 16:46:06 2018 +0900 ing diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3733e36 --- /dev/null +++ b/.gitignore @@ -0,0 +1,68 @@ +# Created by .ignore support plugin (hsz.mobi) +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties +### Go template +# Binaries for programs and plugins +*.exe +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736 +.glide/ +.idea/ +*.iml + +vendor/ +glide.lock +.DS_Store +dist/ +debug diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..2ca2b1d --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,32 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${workspaceRoot}/main.go", + "env": {}, + "args": [], + "showLog": true + }, + { + "name": "File Debug", + "type": "go", + "request": "launch", + "mode": "debug", + "remotePath": "", + "port": 2345, + "host": "127.0.0.1", + "program": "${fileDirname}", + "env": {}, + "args": [], + "showLog": true + } + + ] +} \ No newline at end of file diff --git a/const.go b/const.go new file mode 100644 index 0000000..e1206ea --- /dev/null +++ b/const.go @@ -0,0 +1,64 @@ +package external + +import ( + "encoding/json" + "fmt" + + cuc "git.loafle.net/commons/util-go/context" +) + +const ( + GRPCClientTypeKey = cuc.ContextKey("OVERFLOW_GRPC_CLIENT_TYPE") + GRPCSessionIDKey = cuc.ContextKey("OVERFLOW_GRPC_SESSION_ID") + GRPCTargetIDKey = cuc.ContextKey("OVERFLOW_GRPC_TARGET_ID") +) + +type ClientType int + +const ( + MEMBER ClientType = iota + PROBE + CONSUMER +) + +var ( + clientTypeID = map[ClientType]string{ + MEMBER: "MEMBER", + PROBE: "PROBE", + CONSUMER: "CONSUMER", + } + + clientTypeName = map[string]ClientType{ + "MEMBER": MEMBER, + "PROBE": PROBE, + "CONSUMER": CONSUMER, + } +) + +func (ct ClientType) String() string { + return clientTypeID[ct] +} + +func (ct ClientType) MarshalJSON() ([]byte, error) { + value, ok := clientTypeID[ct] + if !ok { + return nil, fmt.Errorf("Invalid EnumType[%s] value", ct) + } + return json.Marshal(value) +} + +func (ct ClientType) UnmarshalJSON(b []byte) error { + // unmarshal as string + var s string + err := json.Unmarshal(b, &s) + if err != nil { + return err + } + + value, ok := clientTypeName[s] + if !ok { + return fmt.Errorf("Invalid EnumType[%s] value", s) + } + ct = value + return nil +} diff --git a/external.go b/external.go new file mode 100644 index 0000000..f39655d --- /dev/null +++ b/external.go @@ -0,0 +1,44 @@ +package external + +import ( + occe "git.loafle.net/overflow/commons-go/config/external" + "git.loafle.net/overflow/external-go/grpc" + "git.loafle.net/overflow/external-go/kafka" + "git.loafle.net/overflow/external-go/redis" +) + +func InitPackage(config *occe.External) { + if nil == config { + return + } + grpc.InitPackage(config.GRPC) + redis.InitPackage(config.Redis) + kafka.InitPackage(config.Kafka) +} + +func StartPackage(config *occe.External) { + if nil == config { + return + } + grpc.StartPackage(config.GRPC) + redis.StartPackage(config.Redis) + kafka.StartPackage(config.Kafka) +} + +func StopPackage(config *occe.External) { + if nil == config { + return + } + grpc.StopPackage(config.GRPC) + redis.StopPackage(config.Redis) + kafka.StopPackage(config.Kafka) +} + +func DestroyPackage(config *occe.External) { + if nil == config { + return + } + grpc.DestroyPackage(config.GRPC) + redis.DestroyPackage(config.Redis) + kafka.DestroyPackage(config.Kafka) +} diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..a3f0616 --- /dev/null +++ b/glide.yaml @@ -0,0 +1,19 @@ +package: git.loafle.net/overflow/external-go +import: +- package: git.loafle.net/commons/logging-go +- package: git.loafle.net/commons/util-go + subpackages: + - context +- package: git.loafle.net/overflow/central-api + subpackages: + - golang +- package: git.loafle.net/overflow/commons-go + subpackages: + - config/external +- package: github.com/gomodule/redigo + version: ^2.0.0 + subpackages: + - redis +- package: github.com/segmentio/kafka-go +- package: google.golang.org/grpc + version: ^1.12.0 diff --git a/grpc/grpc.go b/grpc/grpc.go new file mode 100644 index 0000000..7144eae --- /dev/null +++ b/grpc/grpc.go @@ -0,0 +1,74 @@ +package grpc + +import ( + "context" + "fmt" + "strings" + "sync" + + "git.loafle.net/commons/logging-go" + oci "git.loafle.net/overflow/central-api/golang" + occe "git.loafle.net/overflow/commons-go/config/external" + "google.golang.org/grpc" +) + +var grpcClient oci.CentralAPIClient + +func InitPackage(config *occe.GRPC) { + if nil == config { + return + } + + conn, err := grpc.Dial(config.Address, grpc.WithInsecure()) + if nil != err { + logging.Logger().Panic(err) + } + grpcClient = oci.NewCentralAPIClient(conn) + logging.Logger().Infof("GRPC: connected to %s", config.Address) +} + +func StartPackage(config *occe.GRPC) { + if nil == config { + return + } +} + +func StopPackage(config *occe.GRPC) { + if nil == config { + return + } + +} + +func DestroyPackage(config *occe.GRPC) { + if nil == config { + return + } + +} + +var execMtx sync.RWMutex + +func Exec(ctx context.Context, method string, params ...string) (string, error) { + if nil == grpcClient { + return "", fmt.Errorf("GRPC Client is not initialized") + } + + var err error + + sm := strings.Split(method, ".") + si := &oci.ServerInput{ + Target: sm[0], + Method: sm[1], + Params: params, + } + + execMtx.Lock() + defer execMtx.Unlock() + so, err := grpcClient.(oci.CentralAPIClient).Exec(ctx, si) + if nil != err { + return "", err + } + + return so.Result, nil +} diff --git a/kafka/kafka.go b/kafka/kafka.go new file mode 100644 index 0000000..73fd5d9 --- /dev/null +++ b/kafka/kafka.go @@ -0,0 +1,89 @@ +package kafka + +import ( + "context" + "fmt" + + "git.loafle.net/commons/logging-go" + occe "git.loafle.net/overflow/commons-go/config/external" + "github.com/segmentio/kafka-go" +) + +var ( + kafkaWriters map[string]*kafka.Writer +) + +func InitPackage(config *occe.Kafka) { + if nil == config { + return + } + if nil == config.Producers || 0 == len(config.Producers) { + return + } + + kafkaWriters = make(map[string]*kafka.Writer) + + for n, c := range config.Producers { + wc := kafka.WriterConfig{ + Brokers: c.Brokers, + Topic: c.Topic, + Balancer: &kafka.LeastBytes{}, + MaxAttempts: c.MaxAttempts, + QueueCapacity: c.QueueCapacity, + BatchSize: c.BatchSize, + BatchTimeout: c.BatchTimeout, + ReadTimeout: c.ReadTimeout, + WriteTimeout: c.WriteTimeout, + RebalanceInterval: c.RebalanceInterval, + RequiredAcks: c.RequiredAcks, + Async: c.Async, + } + kafkaWriters[n] = kafka.NewWriter(wc) + } +} + +func StartPackage(config *occe.Kafka) { + if nil == config { + return + } + +} + +func StopPackage(config *occe.Kafka) { + if nil == config { + return + } + +} + +func DestroyPackage(config *occe.Kafka) { + if nil == config { + return + } + + for _, w := range kafkaWriters { + if err := w.Close(); nil != err { + logging.Logger().Errorf("%v", err) + } + } +} + +func Write(name string, key []byte, value []byte) error { + if nil == kafkaWriters { + return fmt.Errorf("Kafka client is not valid") + } + + w, ok := kafkaWriters[name] + if !ok { + return fmt.Errorf("Kafka client is not valid") + } + + err := w.WriteMessages(context.Background(), + kafka.Message{ + Key: key, + Value: value, + }, + ) + + return err +} diff --git a/redis/redis.go b/redis/redis.go new file mode 100644 index 0000000..90c9f11 --- /dev/null +++ b/redis/redis.go @@ -0,0 +1,48 @@ +package redis + +import ( + "git.loafle.net/commons/logging-go" + occe "git.loafle.net/overflow/commons-go/config/external" + "github.com/gomodule/redigo/redis" +) + +var Pool *redis.Pool + +func InitPackage(config *occe.Redis) { + if nil == config { + return + } + + Pool = &redis.Pool{ + MaxIdle: 1, + MaxActive: 3, + IdleTimeout: 240, + Wait: true, + MaxConnLifetime: 1, + Dial: func() (redis.Conn, error) { + return redis.Dial(config.Network, config.Address) + }, + } + logging.Logger().Infof("Redis: initialized [%s:%s]", config.Network, config.Address) +} + +func StartPackage(config *occe.Redis) { + if nil == config { + return + } +} + +func StopPackage(config *occe.Redis) { + if nil == config { + return + } + +} + +func DestroyPackage(config *occe.Redis) { + if nil == config { + return + } + + Pool.Close() +}