package server import ( "fmt" "log" "net" "git.loafle.net/commons_go/logging" "git.loafle.net/overflow/overflow_discovery/discovery" "git.loafle.net/overflow/overflow_discovery/rpc/notify" crs "git.loafle.net/commons_go/rpc/server" "git.loafle.net/commons_go/server" ) func newServerHandler(addr string, rpcSH RPCServerHandler) ServerHandler { sh := &ServerHandlers{ addr: addr, rpcSH: rpcSH, } sh.Name = "Discovery" return sh } type ServerHandlers struct { server.ServerHandlers rpcSH RPCServerHandler addr string } func (sh *ServerHandlers) Init() error { return nil } func (sh *ServerHandlers) OnStart() { discovery.DiscoveryInit() } func (sh *ServerHandlers) OnConnect(conn net.Conn) (net.Conn, error) { var err error if conn, err = sh.ServerHandlers.OnConnect(conn); nil != err { return nil, err } nConn := newConn(conn, "jsonrpc") notify.NotifyInit(nConn) return nConn, nil } func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) { defer func() { notify.NotifyDestroy() }() dConn := conn.(Conn) contentType := dConn.GetContentType() codec, err := sh.rpcSH.GetCodec(contentType) if nil != err { log.Printf("RPC Handle: %v", err) doneChan <- struct{}{} return } for { if err := crs.Handle(sh.rpcSH, codec, conn, conn); nil != err { if server.IsClientDisconnect(err) { doneChan <- struct{}{} return } log.Printf("RPC: %v", err) } select { case <-stopChan: return default: } } } func (sh *ServerHandlers) OnStop() { discovery.DiscoveryDestroy() } func (sh *ServerHandlers) Validate() { sh.ServerHandlers.Validate() if "" == sh.addr { logging.Logger().Panic(fmt.Sprintf("Server: Address of server must be specified")) } if nil == sh.rpcSH { logging.Logger().Panic(fmt.Sprintf("Server: RPC Server Handler must be specified")) } }