This commit is contained in:
crusader 2018-04-12 21:39:48 +09:00
parent e561dd5584
commit a0d3ff158a
5 changed files with 33 additions and 46 deletions

View File

@ -1,17 +1,18 @@
package main package main
import ( import (
"git.loafle.net/commons/logging-go" "context"
"flag"
"log" "log"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"time" "time"
"git.loafle.net/overflow/probe_gateway_rpc/server"
"context"
"flag"
"git.loafle.net/commons/configuration-go" "git.loafle.net/commons/configuration-go"
"git.loafle.net/commons/logging-go"
"git.loafle.net/overflow/member_gateway_rpc/config" "git.loafle.net/overflow/member_gateway_rpc/config"
"git.loafle.net/overflow/probe_gateway_rpc/server"
) )
var ( var (

View File

@ -1,6 +1,5 @@
package server package server
import ( import (
cs "git.loafle.net/commons/server-go" cs "git.loafle.net/commons/server-go"
oge "git.loafle.net/overflow/gateway/external" oge "git.loafle.net/overflow/gateway/external"
@ -19,7 +18,6 @@ type ServerHandlers struct {
Config *config.Config Config *config.Config
} }
func (sh *ServerHandlers) Init(serverCtx cs.ServerCtx) error { func (sh *ServerHandlers) Init(serverCtx cs.ServerCtx) error {
if err := sh.ServerHandlers.Init(serverCtx); nil != err { if err := sh.ServerHandlers.Init(serverCtx); nil != err {
return err return err

View File

@ -13,7 +13,7 @@ func New(_config *config.Config) *cssw.Server {
sh := &ServerHandlers{ sh := &ServerHandlers{
ServerHandlers: *_config.ServerHandler, ServerHandlers: *_config.ServerHandler,
Config: _config, Config: _config,
} }
sh.RegisterServlet("/auth", as) sh.RegisterServlet("/auth", as)

View File

@ -1,26 +1,24 @@
package servlet package servlet
import ( import (
"sync"
"fmt"
"encoding/base64"
"context" "context"
"encoding/base64"
"encoding/json" "encoding/json"
"fmt"
"sync"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"git.loafle.net/commons/logging-go"
"git.loafle.net/commons/server-go" "git.loafle.net/commons/server-go"
og "git.loafle.net/overflow/gateway" "git.loafle.net/commons/server-go/socket"
ocnc "git.loafle.net/overflow/commons-go/noauthprobe/constants" ocnc "git.loafle.net/overflow/commons-go/noauthprobe/constants"
ocnm "git.loafle.net/overflow/commons-go/noauthprobe/model" ocnm "git.loafle.net/overflow/commons-go/noauthprobe/model"
og "git.loafle.net/overflow/gateway"
"git.loafle.net/overflow/member_gateway_rpc/subscribe" "git.loafle.net/overflow/gateway/external/grpc"
ogs "git.loafle.net/overflow/gateway/subscribe" ogs "git.loafle.net/overflow/gateway/subscribe"
ogrs "git.loafle.net/overflow/gateway_rpc/servlet" ogrs "git.loafle.net/overflow/gateway_rpc/servlet"
"git.loafle.net/commons/logging-go" "git.loafle.net/overflow/member_gateway_rpc/subscribe"
"git.loafle.net/overflow/gateway/external/grpc"
"git.loafle.net/commons/server-go/socket"
) )
type AuthServlet interface { type AuthServlet interface {
@ -105,35 +103,27 @@ func (s *AuthServlets) Handshake(servletCtx server.ServletCtx, ctx *fasthttp.Req
return extHeader, nil return extHeader, nil
case ocnc.HTTPRequestHeaderValue_NoAuthProbe_Method_Connect: case ocnc.HTTPRequestHeaderValue_NoAuthProbe_Method_Connect:
bTempProbeKey := ctx.Request.Header.Peek(ocnc.HTTPRequestHeaderKey_NoAuthProbe_TempProbeKey) bTempProbeKey := ctx.Request.Header.Peek(ocnc.HTTPRequestHeaderKey_NoAuthProbe_TempProbeKey)
if nil == bTempProbeKey { if nil == bTempProbeKey {
return nil, fmt.Errorf("Noauth probe temp key is not an existing key", ) return nil, fmt.Errorf("Noauth probe temp key is not an existing key")
} }
tempProbeKey := string(bTempProbeKey)
grpcCTX := context.Background() grpcCTX := context.Background()
r, err := grpc.Exec(grpcCTX, "NoAuthProbeService.readByTempKey", string(bTempProbeKey)) _, err := grpc.Exec(grpcCTX, "NoAuthProbeService.readByTempKey", tempProbeKey)
if nil != err { if nil != err {
return nil, fmt.Errorf("grpc result error: %s", err.Error() ) return nil, fmt.Errorf("grpc result error: %s", err.Error())
} }
nap := &ocnm.NoAuthProbe{} servletCtx.SetAttribute(og.SessionIDKey, tempProbeKey)
err = json.Unmarshal([]byte(r), nap)
if nil != err {
return nil, fmt.Errorf("grpc result unMarshal Error: %s", err.Error())
}
servletCtx.SetAttribute(og.SessionIDKey, bTempProbeKey)
servletCtx.SetAttribute(og.SessionClientTypeKey, og.PROBE) servletCtx.SetAttribute(og.SessionClientTypeKey, og.PROBE)
servletCtx.SetAttribute(og.SessionTargetIDKey, bTempProbeKey) servletCtx.SetAttribute(og.SessionTargetIDKey, tempProbeKey)
return nil, nil return nil, nil
default: default:
return nil, fmt.Errorf("Unexpected noauth probe httpRequestHeaderValue: %v", method) return nil, fmt.Errorf("Unexpected noauth probe httpRequestHeaderValue: %v", method)
} }
return nil, nil
} }
func (s *AuthServlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) { func (s *AuthServlets) OnConnect(servletCtx server.ServletCtx, conn socket.Conn) {
@ -158,14 +148,14 @@ func (s *AuthServlets) OnDisconnect(servletCtx server.ServletCtx) {
func (s *AuthServlets) handleSubscribe(serverCtx server.ServerCtx, subscribeChan <-chan *ogs.Message) { func (s *AuthServlets) handleSubscribe(serverCtx server.ServerCtx, subscribeChan <-chan *ogs.Message) {
for { for {
select { select {
case msg, ok := <- subscribeChan: case msg, ok := <-subscribeChan:
if !ok { if !ok {
return return
} }
switch msg.TargetType { switch msg.TargetType {
case ogs.PROBE: case ogs.PROBE:
for _, targetID := range msg.Targets { for _, targetID := range msg.Targets {
_connections := s.getProbeConnections(targetID) _connections := s.getProbeConnections(targetID)
if nil == _connections || 0 == len(_connections) { if nil == _connections || 0 == len(_connections) {
break break
@ -215,4 +205,4 @@ func releaseConnection(_connection *connection) {
_connection.servletCtx = nil _connection.servletCtx = nil
connectionPool.Put(_connection) connectionPool.Put(_connection)
} }

View File

@ -1,25 +1,23 @@
package servlet package servlet
import ( import (
"sync"
"fmt"
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"sync"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"git.loafle.net/commons/logging-go"
"git.loafle.net/commons/server-go" "git.loafle.net/commons/server-go"
og "git.loafle.net/overflow/gateway" "git.loafle.net/commons/server-go/socket"
ocpc "git.loafle.net/overflow/commons-go/probe/constants" ocpc "git.loafle.net/overflow/commons-go/probe/constants"
ocpm "git.loafle.net/overflow/commons-go/probe/model" ocpm "git.loafle.net/overflow/commons-go/probe/model"
og "git.loafle.net/overflow/gateway"
"git.loafle.net/overflow/member_gateway_rpc/subscribe" "git.loafle.net/overflow/gateway/external/grpc"
ogs "git.loafle.net/overflow/gateway/subscribe" ogs "git.loafle.net/overflow/gateway/subscribe"
ogrs "git.loafle.net/overflow/gateway_rpc/servlet" ogrs "git.loafle.net/overflow/gateway_rpc/servlet"
"git.loafle.net/commons/logging-go" "git.loafle.net/overflow/member_gateway_rpc/subscribe"
"git.loafle.net/overflow/gateway/external/grpc"
"git.loafle.net/commons/server-go/socket"
) )
type ProbeServlet interface { type ProbeServlet interface {
@ -133,14 +131,14 @@ func (s *ProbeServlets) OnDisconnect(servletCtx server.ServletCtx) {
func (s *ProbeServlets) handleSubscribe(serverCtx server.ServerCtx, subscribeChan <-chan *ogs.Message) { func (s *ProbeServlets) handleSubscribe(serverCtx server.ServerCtx, subscribeChan <-chan *ogs.Message) {
for { for {
select { select {
case msg, ok := <- subscribeChan: case msg, ok := <-subscribeChan:
if !ok { if !ok {
return return
} }
switch msg.TargetType { switch msg.TargetType {
case ogs.PROBE: case ogs.PROBE:
for _, targetID := range msg.Targets { for _, targetID := range msg.Targets {
_connections := s.getProbeConnections(targetID) _connections := s.getProbeConnections(targetID)
if nil == _connections || 0 == len(_connections) { if nil == _connections || 0 == len(_connections) {
break break
@ -190,4 +188,4 @@ func (s *ProbeServlets) getProbeConnections(targetID string) []*connection {
// _connection.servletCtx = nil // _connection.servletCtx = nil
// //
// connectionPool.Put(_connection) // connectionPool.Put(_connection)
//} //}