This commit is contained in:
crusader 2018-05-11 12:54:30 +09:00
commit 82625bdaae
15 changed files with 509 additions and 0 deletions

68
.gitignore vendored Normal file
View File

@ -0,0 +1,68 @@
# Created by .ignore support plugin (hsz.mobi)
### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff:
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/dictionaries
# Sensitive or high-churn files:
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.xml
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
# Gradle:
.idea/**/gradle.xml
.idea/**/libraries
# Mongo Explorer plugin:
.idea/**/mongoSettings.xml
## File-based project format:
*.iws
## Plugin-specific files:
# IntelliJ
/out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
### Go template
# Binaries for programs and plugins
*.exe
*.dll
*.so
*.dylib
# Test binary, build with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/
.idea/
*.iml
vendor/
glide.lock
.DS_Store
dist/
debug

32
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,32 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Debug",
"type": "go",
"request": "launch",
"mode": "debug",
"remotePath": "",
"port": 2345,
"host": "127.0.0.1",
"program": "${workspaceRoot}/main.go",
"env": {},
"args": [],
"showLog": true
},
{
"name": "File Debug",
"type": "go",
"request": "launch",
"mode": "debug",
"remotePath": "",
"port": 2345,
"host": "127.0.0.1",
"program": "${fileDirname}",
"env": {},
"args": [],
"showLog": true
}
]
}

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
// Place your settings in this file to overwrite default and user settings.
{
}

23
Dockerfile Normal file
View File

@ -0,0 +1,23 @@
FROM alpine:3.7
# grab su-exec for easy step-down from root
RUN apk add --no-cache curl
COPY _docker/config/* /etc/overflow/config/
COPY _docker/bin/* /usr/local/overflow/bin/
COPY dist/probe_gateway_metric /usr/local/overflow/bin/
RUN mkdir -p /etc/overflow/config; \
mkdir -p /usr/local/overflow/bin; \
mkdir -p /var/overflow/logs; \
chmod +x /usr/local/overflow/bin/docker-entrypoint.sh /usr/local/overflow/bin/probe_gateway_metric;
ENV TINI_VERSION='0.17.0'
# Use tini as subreaper in Docker container to adopt zombie processes
RUN curl -fsSL https://github.com/krallin/tini/releases/download/v${TINI_VERSION}/tini-static-amd64 -o /bin/tini && chmod +x /bin/tini
EXPOSE 80
ENTRYPOINT ["/bin/tini", "--"]
CMD ["/usr/local/overflow/bin/docker-entrypoint.sh"]

View File

@ -0,0 +1,4 @@
#!/bin/sh
set -e
exec /usr/local/overflow/bin/probe_gateway_metric -config-dir=/etc/overflow/config

View File

@ -0,0 +1,29 @@
{
"serverHandler": {
"name": "Gateway Probe",
"network": "tcp4",
"address": ":80",
"concurrency": 262144,
"keepAlive": 60,
"handshakeTimeout": 60,
"maxMessageSize": 8192,
"readBufferSize": 4096,
"writeBufferSize": 4096,
"readTimeout": 0,
"writeTimeout": 0,
"pongTimeout": 60,
"pingTimeout": 10,
"pingPeriod": 9,
"enableCompression": false
},
"external": {
"grpc": {
"network": "tcp4",
"address": "192.168.1.50:50006"
},
"redis": {
"network": "tcp4",
"address": "192.168.1.50:6379"
}
}
}

8
build.sh Executable file
View File

@ -0,0 +1,8 @@
#!/bin/bash
rm ./dist
CGO_ENABLED=0 go build -a --installsuffix cgo --ldflags="-s" -o ./dist/probe_gateway_metric
docker build -t docker.loafle.net/overflow/probe_gateway_metric:1.0.0 .
docker push docker.loafle.net/overflow/probe_gateway_metric:1.0.0

35
config.json Normal file
View File

@ -0,0 +1,35 @@
{
"serverHandler": {
"name": "Gateway Metric",
"network": "tcp4",
"address": ":19095",
"concurrency": 262144,
"keepAlive": 60,
"handshakeTimeout": 60,
"maxMessageSize": 8192,
"readBufferSize": 4096,
"writeBufferSize": 4096,
"readTimeout": 0,
"writeTimeout": 0,
"pongTimeout": 60,
"pingTimeout": 10,
"pingPeriod": 9,
"enableCompression": false
},
"external": {
"grpc": {
"network": "tcp4",
"address": "192.168.1.50:50006"
},
"kafka": {
"producers": {
"metric": {
"brokers": [
"192.168.1.50:9092"
],
"topic": "overflow-metric-topic"
}
}
}
}
}

11
config/config.go Normal file
View File

@ -0,0 +1,11 @@
package config
import (
occe "git.loafle.net/overflow/commons-go/config/external"
ogrs "git.loafle.net/overflow/gateway_rpc/server"
)
type Config struct {
ServerHandler *ogrs.ServerHandlers `json:"serverHandler"`
External *occe.External `json:"external"`
}

11
docker-compose.yml Normal file
View File

@ -0,0 +1,11 @@
version: "3"
services:
probe_gateway_metric:
image: docker.loafle.net/overflow/probe_gateway_metric:1.0.0
container_name: probe_gateway_metric
# volumes:
# - /service/redis/data/var/redis:/data
# - /service/redis/data/usr/local/etc/redis/redis.conf:/usr/local/etc/redis/redis.conf
ports:
- "19091:80"

24
glide.yaml Normal file
View File

@ -0,0 +1,24 @@
package: git.loafle.net/overflow/probe_gateway_metric
import:
- package: git.loafle.net/commons/configuration-go
- package: git.loafle.net/commons/logging-go
- package: git.loafle.net/commons/server-go
subpackages:
- socket
- socket/web
- package: git.loafle.net/overflow/commons-go
subpackages:
- config/external
- config/probe
- package: git.loafle.net/overflow/gateway
subpackages:
- external
- external/grpc
- external/kafka
- external/redis
- subscribe/redis
- package: git.loafle.net/overflow/gateway_rpc
subpackages:
- server
- package: github.com/valyala/fasthttp
version: ^20160617.0.0

61
main.go Normal file
View File

@ -0,0 +1,61 @@
package main
import (
"context"
"flag"
"log"
"os"
"os/signal"
"syscall"
"time"
"git.loafle.net/commons/configuration-go"
"git.loafle.net/commons/logging-go"
"git.loafle.net/overflow/probe_gateway_metric/config"
"git.loafle.net/overflow/probe_gateway_metric/server"
)
var (
configDir *string
)
func init() {
configDir = flag.String("config-dir", "./", "Config directory")
logConfigPath := flag.String("log-config", "", "logging config path")
flag.Parse()
logging.InitializeLogger(*logConfigPath)
}
func main() {
_config := &config.Config{}
configuration.SetConfigPath(*configDir)
if err := configuration.Load(_config, "config.json"); nil != err {
logging.Logger().Panic(err)
}
s := server.New(_config)
go func() {
err := s.ListenAndServe()
if nil != err {
log.Printf("err: %v", err)
}
}()
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt,
syscall.SIGKILL,
syscall.SIGSTOP,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
<-interrupt
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := s.Shutdown(ctx); err != nil {
logging.Logger().Errorf("error: %v", err)
}
}

49
server/server-handler.go Normal file
View File

@ -0,0 +1,49 @@
package server
import (
cs "git.loafle.net/commons/server-go"
oge "git.loafle.net/overflow/gateway/external"
ogrs "git.loafle.net/overflow/gateway_rpc/server"
"git.loafle.net/overflow/probe_gateway_metric/config"
)
type ServerHandler interface {
ogrs.ServerHandler
}
type ServerHandlers struct {
ogrs.ServerHandlers
Config *config.Config
}
func (sh *ServerHandlers) Init(serverCtx cs.ServerCtx) error {
if err := sh.ServerHandlers.Init(serverCtx); nil != err {
return err
}
oge.InitPackage(sh.Config.External)
return nil
}
func (sh *ServerHandlers) OnStart(serverCtx cs.ServerCtx) error {
if err := sh.ServerHandlers.OnStart(serverCtx); nil != err {
return err
}
oge.StartPackage(sh.Config.External)
return nil
}
func (sh *ServerHandlers) OnStop(serverCtx cs.ServerCtx) {
oge.StopPackage(sh.Config.External)
sh.ServerHandlers.OnStop(serverCtx)
}
func (sh *ServerHandlers) Destroy(serverCtx cs.ServerCtx) {
oge.DestroyPackage(sh.Config.External)
sh.ServerHandlers.Destroy(serverCtx)
}

25
server/server.go Normal file
View File

@ -0,0 +1,25 @@
package server
import (
cssw "git.loafle.net/commons/server-go/socket/web"
"git.loafle.net/overflow/probe_gateway_metric/config"
"git.loafle.net/overflow/probe_gateway_metric/servlet"
)
func New(_config *config.Config) *cssw.Server {
ds := &servlet.DataServlets{}
sh := &ServerHandlers{
ServerHandlers: *_config.ServerHandler,
Config: _config,
}
sh.RegisterServlet("/data", ds)
s := &cssw.Server{
ServerHandler: sh,
}
return s
}

126
servlet/data-servlet.go Normal file
View File

@ -0,0 +1,126 @@
package servlet
import (
"context"
"fmt"
"github.com/valyala/fasthttp"
logging "git.loafle.net/commons/logging-go"
"git.loafle.net/commons/server-go"
"git.loafle.net/commons/server-go/socket"
cssw "git.loafle.net/commons/server-go/socket/web"
occp "git.loafle.net/overflow/commons-go/config/probe"
og "git.loafle.net/overflow/gateway"
"git.loafle.net/overflow/gateway/external/grpc"
"git.loafle.net/overflow/gateway/external/kafka"
)
type DataServlet interface {
cssw.Servlet
}
type DataServlets struct {
cssw.Servlets
}
func (s *DataServlets) Init(serverCtx server.ServerCtx) error {
if err := s.Servlets.Init(serverCtx); nil != err {
return err
}
return nil
}
func (s *DataServlets) OnStart(serverCtx server.ServerCtx) error {
if err := s.Servlets.OnStart(serverCtx); nil != err {
return err
}
return nil
}
func (s *DataServlets) OnStop(serverCtx server.ServerCtx) {
s.Servlets.OnStop(serverCtx)
}
func (s *DataServlets) Destroy(serverCtx server.ServerCtx) {
s.Servlets.Destroy(serverCtx)
}
func (s *DataServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.RequestCtx) (*fasthttp.ResponseHeader, error) {
// probe key extraction
bMethod := ctx.Request.Header.Peek(occp.HTTPRequestHeaderKey_Probe_Method)
if nil == bMethod {
return nil, fmt.Errorf("Unexpected probe method: %v", bMethod)
}
method := string(bMethod)
switch method {
case occp.HTTPRequestHeaderValue_Probe_Method_Connect:
default:
return nil, fmt.Errorf("Unexpected probe method: %s", method)
}
bProbeKey := ctx.Request.Header.Peek(occp.HTTPRequestHeaderKey_Probe_ProbeKey)
if nil == bProbeKey {
return nil, fmt.Errorf("Unexpected probe key : %v", bProbeKey)
}
probeKey := string(bProbeKey)
grpcCTX := context.Background()
_, err := grpc.Exec(grpcCTX, "ProbeService.readByProbeKey", probeKey)
if nil != err {
return nil, fmt.Errorf("grpc call Error: %s", err.Error())
}
servletCtx.SetAttribute(og.SessionIDKey, probeKey)
servletCtx.SetAttribute(og.SessionClientTypeKey, og.PROBE)
servletCtx.SetAttribute(og.SessionTargetIDKey, probeKey)
return nil, nil
}
func (s *DataServlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) {
s.Servlets.OnConnect(servletCtx, conn)
}
func (s *DataServlets) OnDisconnect(servletCtx server.ServletCtx) {
s.Servlets.OnDisconnect(servletCtx)
}
func (s *DataServlets) Handle(servletCtx server.ServletCtx,
stopChan <-chan struct{}, doneChan chan<- struct{},
readChan <-chan []byte, writeChan chan<- []byte) {
defer func() {
doneChan <- struct{}{}
}()
var (
err error
)
for {
select {
case msg, ok := <-readChan:
if !ok {
return
}
err = kafka.Write("metric", msg, msg)
if nil != err {
logging.Logger().Error(err)
break
}
case <-stopChan:
return
}
}
}