From d4664c368e58c8c74f8325d9a7ed688c69bb0b2e Mon Sep 17 00:00:00 2001 From: crusader Date: Fri, 11 May 2018 19:51:52 +0900 Subject: [PATCH] ing --- influxdb/influxdb.go | 112 +++++++++++++++++++++++++++++++++++++++++++ kafka/kafka.go | 28 +++++------ 2 files changed, 126 insertions(+), 14 deletions(-) create mode 100644 influxdb/influxdb.go diff --git a/influxdb/influxdb.go b/influxdb/influxdb.go new file mode 100644 index 0000000..7c694e4 --- /dev/null +++ b/influxdb/influxdb.go @@ -0,0 +1,112 @@ +package influxdb + +import ( + "git.loafle.net/commons/logging-go" + occe "git.loafle.net/overflow/commons-go/config/external" + "github.com/influxdata/influxdb/client/v2" +) + +var ( + clients map[string]client.Client + batchPoints map[string]client.BatchPoints +) + +func InitPackage(config *occe.InfluxDB) { + if nil == config { + return + } + + if nil != config.ClientConfigs && 0 < len(config.ClientConfigs) { + clients = make(map[string]client.Client) + for n, c := range config.ClientConfigs { + switch c.ClientType { + case "http": + hc := client.HTTPConfig{ + Addr: c.Address, + Username: c.Username, + Password: c.Password, + UserAgent: c.UserAgent, + Timeout: c.Timeout, + } + clnt, err := client.NewHTTPClient(hc) + if nil != err { + logging.Logger().Errorf("%v", err) + return + } + clients[n] = clnt + case "udp": + uc := client.UDPConfig{ + Addr: c.Address, + PayloadSize: c.PayloadSize, + } + clnt, err := client.NewUDPClient(uc) + if nil != err { + logging.Logger().Errorf("%v", err) + return + } + clients[n] = clnt + default: + logging.Logger().Errorf("Not Supported InfluxDB Client Type[%s]", c.ClientType) + return + } + } + } + + if nil != config.BatchPointsConfigs && 0 < len(config.BatchPointsConfigs) { + batchPoints = make(map[string]client.BatchPoints) + for n, c := range config.BatchPointsConfigs { + bc := client.BatchPointsConfig{ + Precision: c.Precision, + Database: c.Database, + RetentionPolicy: c.RetentionPolicy, + WriteConsistency: c.WriteConsistency, + } + bp, err := client.NewBatchPoints(bc) + if nil != err { + logging.Logger().Errorf("%v", err) + return + } + batchPoints[n] = bp + } + } +} + +func StartPackage(config *occe.InfluxDB) { + if nil == config { + return + } + +} + +func StopPackage(config *occe.InfluxDB) { + if nil == config { + return + } + +} + +func DestroyPackage(config *occe.InfluxDB) { + if nil == config { + return + } + + for _, c := range clients { + if err := c.Close(); nil != err { + logging.Logger().Errorf("%v", err) + } + } +} + +func ClientFor(name string) client.Client { + if nil == clients { + return nil + } + return clients[name] +} + +func BatchPointsFor(name string) client.BatchPoints { + if nil == batchPoints { + return nil + } + return batchPoints[name] +} diff --git a/kafka/kafka.go b/kafka/kafka.go index 92453d7..e888541 100644 --- a/kafka/kafka.go +++ b/kafka/kafka.go @@ -10,8 +10,8 @@ import ( ) var ( - kafkaWriters map[string]*kafka.Writer - kafkaReaders map[string]*kafka.Reader + writers map[string]*kafka.Writer + readers map[string]*kafka.Reader ) func InitPackage(config *occe.Kafka) { @@ -20,7 +20,7 @@ func InitPackage(config *occe.Kafka) { } if nil != config.Producers && 0 < len(config.Producers) { - kafkaWriters = make(map[string]*kafka.Writer) + writers = make(map[string]*kafka.Writer) for n, c := range config.Producers { wc := kafka.WriterConfig{ Brokers: c.Brokers, @@ -36,12 +36,12 @@ func InitPackage(config *occe.Kafka) { RequiredAcks: c.RequiredAcks, Async: c.Async, } - kafkaWriters[n] = kafka.NewWriter(wc) + writers[n] = kafka.NewWriter(wc) } } if nil != config.Consumers && 0 < len(config.Consumers) { - kafkaReaders = make(map[string]*kafka.Reader) + readers = make(map[string]*kafka.Reader) for n, c := range config.Consumers { rc := kafka.ReaderConfig{ Brokers: c.Brokers, @@ -58,7 +58,7 @@ func InitPackage(config *occe.Kafka) { RebalanceTimeout: c.RebalanceTimeout, RetentionTime: c.RetentionTime, } - kafkaReaders[n] = kafka.NewReader(rc) + readers[n] = kafka.NewReader(rc) } } } @@ -82,13 +82,13 @@ func DestroyPackage(config *occe.Kafka) { return } - for _, w := range kafkaWriters { + for _, w := range writers { if err := w.Close(); nil != err { logging.Logger().Errorf("%v", err) } } - for _, r := range kafkaReaders { + for _, r := range readers { if err := r.Close(); nil != err { logging.Logger().Errorf("%v", err) } @@ -96,25 +96,25 @@ func DestroyPackage(config *occe.Kafka) { } func Writer(name string) *kafka.Writer { - if nil == kafkaWriters { + if nil == writers { return nil } - return kafkaWriters[name] + return writers[name] } func Reader(name string) *kafka.Reader { - if nil == kafkaReaders { + if nil == readers { return nil } - return kafkaReaders[name] + return readers[name] } func Write(name string, key []byte, value []byte) error { - if nil == kafkaWriters { + if nil == writers { return fmt.Errorf("Kafka client is not valid") } - w, ok := kafkaWriters[name] + w, ok := writers[name] if !ok { return fmt.Errorf("Kafka client is not valid") }