diff --git a/const.go b/const.go index 5275db1..3b11547 100644 --- a/const.go +++ b/const.go @@ -1,9 +1,6 @@ package gateway import ( - "encoding/json" - "fmt" - cuc "git.loafle.net/commons/util-go/context" ) @@ -12,55 +9,4 @@ const ( SessionIDKey = cuc.ContextKey("SessionID") SessionTargetIDKey = cuc.ContextKey("SessionTargetID") SessionWriteChanKey = cuc.ContextKey("SessionWriteChan") - - GRPCClientTypeKey = cuc.ContextKey("OVERFLOW_GRPC_CLIENT_TYPE") - GRPCSessionIDKey = cuc.ContextKey("OVERFLOW_GRPC_SESSION_ID") - GRPCTargetIDKey = cuc.ContextKey("OVERFLOW_GRPC_TARGET_ID") ) - -type ClientType int - -const ( - MEMBER ClientType = iota - PROBE -) - -var ( - clientTypeID = map[ClientType]string{ - MEMBER: "MEMBER", - PROBE: "PROBE", - } - - clientTypeName = map[string]ClientType{ - "MEMBER": MEMBER, - "PROBE": PROBE, - } -) - -func (ct ClientType) String() string { - return clientTypeID[ct] -} - -func (ct ClientType) MarshalJSON() ([]byte, error) { - value, ok := clientTypeID[ct] - if !ok { - return nil, fmt.Errorf("Invalid EnumType[%s] value", ct) - } - return json.Marshal(value) -} - -func (ct ClientType) UnmarshalJSON(b []byte) error { - // unmarshal as string - var s string - err := json.Unmarshal(b, &s) - if err != nil { - return err - } - - value, ok := clientTypeName[s] - if !ok { - return fmt.Errorf("Invalid EnumType[%s] value", s) - } - ct = value - return nil -} diff --git a/external/external.go b/external/external.go deleted file mode 100644 index 04a6d24..0000000 --- a/external/external.go +++ /dev/null @@ -1,44 +0,0 @@ -package external - -import ( - occe "git.loafle.net/overflow/commons-go/config/external" - "git.loafle.net/overflow/gateway/external/grpc" - "git.loafle.net/overflow/gateway/external/kafka" - "git.loafle.net/overflow/gateway/external/redis" -) - -func InitPackage(config *occe.External) { - if nil == config { - return - } - grpc.InitPackage(config.GRPC) - redis.InitPackage(config.Redis) - kafka.InitPackage(config.Kafka) -} - -func StartPackage(config *occe.External) { - if nil == config { - return - } - grpc.StartPackage(config.GRPC) - redis.StartPackage(config.Redis) - kafka.StartPackage(config.Kafka) -} - -func StopPackage(config *occe.External) { - if nil == config { - return - } - grpc.StopPackage(config.GRPC) - redis.StopPackage(config.Redis) - kafka.StopPackage(config.Kafka) -} - -func DestroyPackage(config *occe.External) { - if nil == config { - return - } - grpc.DestroyPackage(config.GRPC) - redis.DestroyPackage(config.Redis) - kafka.DestroyPackage(config.Kafka) -} diff --git a/external/grpc/grpc.go b/external/grpc/grpc.go deleted file mode 100644 index 7144eae..0000000 --- a/external/grpc/grpc.go +++ /dev/null @@ -1,74 +0,0 @@ -package grpc - -import ( - "context" - "fmt" - "strings" - "sync" - - "git.loafle.net/commons/logging-go" - oci "git.loafle.net/overflow/central-api/golang" - occe "git.loafle.net/overflow/commons-go/config/external" - "google.golang.org/grpc" -) - -var grpcClient oci.CentralAPIClient - -func InitPackage(config *occe.GRPC) { - if nil == config { - return - } - - conn, err := grpc.Dial(config.Address, grpc.WithInsecure()) - if nil != err { - logging.Logger().Panic(err) - } - grpcClient = oci.NewCentralAPIClient(conn) - logging.Logger().Infof("GRPC: connected to %s", config.Address) -} - -func StartPackage(config *occe.GRPC) { - if nil == config { - return - } -} - -func StopPackage(config *occe.GRPC) { - if nil == config { - return - } - -} - -func DestroyPackage(config *occe.GRPC) { - if nil == config { - return - } - -} - -var execMtx sync.RWMutex - -func Exec(ctx context.Context, method string, params ...string) (string, error) { - if nil == grpcClient { - return "", fmt.Errorf("GRPC Client is not initialized") - } - - var err error - - sm := strings.Split(method, ".") - si := &oci.ServerInput{ - Target: sm[0], - Method: sm[1], - Params: params, - } - - execMtx.Lock() - defer execMtx.Unlock() - so, err := grpcClient.(oci.CentralAPIClient).Exec(ctx, si) - if nil != err { - return "", err - } - - return so.Result, nil -} diff --git a/external/kafka/kafka.go b/external/kafka/kafka.go deleted file mode 100644 index 73fd5d9..0000000 --- a/external/kafka/kafka.go +++ /dev/null @@ -1,89 +0,0 @@ -package kafka - -import ( - "context" - "fmt" - - "git.loafle.net/commons/logging-go" - occe "git.loafle.net/overflow/commons-go/config/external" - "github.com/segmentio/kafka-go" -) - -var ( - kafkaWriters map[string]*kafka.Writer -) - -func InitPackage(config *occe.Kafka) { - if nil == config { - return - } - if nil == config.Producers || 0 == len(config.Producers) { - return - } - - kafkaWriters = make(map[string]*kafka.Writer) - - for n, c := range config.Producers { - wc := kafka.WriterConfig{ - Brokers: c.Brokers, - Topic: c.Topic, - Balancer: &kafka.LeastBytes{}, - MaxAttempts: c.MaxAttempts, - QueueCapacity: c.QueueCapacity, - BatchSize: c.BatchSize, - BatchTimeout: c.BatchTimeout, - ReadTimeout: c.ReadTimeout, - WriteTimeout: c.WriteTimeout, - RebalanceInterval: c.RebalanceInterval, - RequiredAcks: c.RequiredAcks, - Async: c.Async, - } - kafkaWriters[n] = kafka.NewWriter(wc) - } -} - -func StartPackage(config *occe.Kafka) { - if nil == config { - return - } - -} - -func StopPackage(config *occe.Kafka) { - if nil == config { - return - } - -} - -func DestroyPackage(config *occe.Kafka) { - if nil == config { - return - } - - for _, w := range kafkaWriters { - if err := w.Close(); nil != err { - logging.Logger().Errorf("%v", err) - } - } -} - -func Write(name string, key []byte, value []byte) error { - if nil == kafkaWriters { - return fmt.Errorf("Kafka client is not valid") - } - - w, ok := kafkaWriters[name] - if !ok { - return fmt.Errorf("Kafka client is not valid") - } - - err := w.WriteMessages(context.Background(), - kafka.Message{ - Key: key, - Value: value, - }, - ) - - return err -} diff --git a/external/redis/redis.go b/external/redis/redis.go deleted file mode 100644 index 90c9f11..0000000 --- a/external/redis/redis.go +++ /dev/null @@ -1,48 +0,0 @@ -package redis - -import ( - "git.loafle.net/commons/logging-go" - occe "git.loafle.net/overflow/commons-go/config/external" - "github.com/gomodule/redigo/redis" -) - -var Pool *redis.Pool - -func InitPackage(config *occe.Redis) { - if nil == config { - return - } - - Pool = &redis.Pool{ - MaxIdle: 1, - MaxActive: 3, - IdleTimeout: 240, - Wait: true, - MaxConnLifetime: 1, - Dial: func() (redis.Conn, error) { - return redis.Dial(config.Network, config.Address) - }, - } - logging.Logger().Infof("Redis: initialized [%s:%s]", config.Network, config.Address) -} - -func StartPackage(config *occe.Redis) { - if nil == config { - return - } -} - -func StopPackage(config *occe.Redis) { - if nil == config { - return - } - -} - -func DestroyPackage(config *occe.Redis) { - if nil == config { - return - } - - Pool.Close() -} diff --git a/glide.yaml b/glide.yaml index 8fa27f8..feae15d 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,10 +1,10 @@ package: git.loafle.net/overflow/gateway import: - package: git.loafle.net/commons/logging-go -- package: google.golang.org/grpc - version: ^1.11.2 +- package: git.loafle.net/commons/util-go + subpackages: + - context - package: github.com/gomodule/redigo version: ^2.0.0 -- package: github.com/segmentio/kafka-go -- package: git.loafle.net/commons/util-go -- package: git.loafle.net/overflow/central-api + subpackages: + - redis diff --git a/subscribe/redis/redis.go b/subscribe/redis/redis.go index 52568c0..9d7505c 100644 --- a/subscribe/redis/redis.go +++ b/subscribe/redis/redis.go @@ -5,7 +5,7 @@ import ( "fmt" "sync" - logging "git.loafle.net/commons/logging-go" + "git.loafle.net/commons/logging-go" "git.loafle.net/overflow/gateway/subscribe" "github.com/gomodule/redigo/redis" )