This commit is contained in:
crusader 2017-11-15 15:59:43 +09:00
parent a471858025
commit bb465d3628
11 changed files with 188 additions and 48 deletions

View File

@ -6,3 +6,4 @@ import:
- package: git.loafle.net/commons_go/local_socket.git
- package: git.loafle.net/commons_go/server
- package: git.loafle.net/commons_go/rpc
- package: gopkg.in/natefinch/npipe.v2

15
main.go
View File

@ -8,8 +8,7 @@ import (
"os/signal"
"syscall"
cRPC "git.loafle.net/commons_go/rpc"
"git.loafle.net/overflow/overflow_discovery/rpc"
"git.loafle.net/commons_go/logging"
"git.loafle.net/overflow/overflow_discovery/server"
)
@ -24,19 +23,16 @@ func init() {
}
func main() {
registry := cRPC.NewRegistry()
if err := registry.RegisterService(new(rpc.DiscoveryService), ""); nil != err {
panic(err)
}
defer logging.Logger().Sync()
s := server.New(*sockFile, registry)
s := server.New(*sockFile)
stop := make(chan os.Signal)
signal.Notify(stop, syscall.SIGINT)
go func() {
if err := s.Serve(); err != nil {
log.Fatalf("Cannot start rpc server: %s", err)
if err := s.Start(); nil != err {
log.Printf("Server: Start error %v", err)
return
}
}()
@ -45,6 +41,7 @@ func main() {
case signal := <-stop:
fmt.Printf("Got signal: %v\n", signal)
}
s.Stop()
}

View File

@ -1 +1,7 @@
package rpc
import "git.loafle.net/commons_go/rpc"
func RegisterRPC(registry rpc.Registry) {
registry.RegisterService(&DiscoveryService{}, "")
}

28
server/conn.go Normal file
View File

@ -0,0 +1,28 @@
package server
import "net"
func newConn(nconn net.Conn, contentType string) Conn {
c := &conn{
contentType: contentType,
}
c.Conn = nconn
return c
}
type Conn interface {
net.Conn
GetContentType() string
}
type conn struct {
net.Conn
contentType string
}
func (c *conn) GetContentType() string {
return c.contentType
}

View File

@ -0,0 +1,9 @@
package server
import (
"git.loafle.net/commons_go/rpc/server"
)
type RPCServerHandler interface {
server.ServerHandler
}

View File

@ -4,49 +4,58 @@ import (
"io"
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol/json"
"git.loafle.net/commons_go/rpc/server"
)
func NewRPCServerHandler(registry rpc.Registry) *RPCServerHandlers {
rpcSH := &RPCServerHandlers{}
rpcSH.RPCRegistry = registry
func newRPCServerHandler(rpcRegistry rpc.Registry) server.ServerHandler {
sh := &RPCServerHandlers{}
sh.RPCRegistry = rpcRegistry
rpcSH.RegisterCodec(json.NewServerCodec(), "json")
return rpcSH
return sh
}
type RPCServerHandlers struct {
server.RPCServerHandlers
addr string
server.ServerHandlers
}
func (sh *ServerHandlers) GetContentType(r io.Reader) string {
return "json"
func (sh *RPCServerHandlers) Init() error {
return nil
}
func (sh *ServerHandlers) OnPreRead(r io.Reader) {
func (sh *RPCServerHandlers) OnStart() {
// no op
}
func (sh *ServerHandlers) OnPostRead(r io.Reader) {
func (sh *RPCServerHandlers) OnPreRead(r io.Reader) {
// no op
}
func (sh *ServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) {
func (sh *RPCServerHandlers) OnPostRead(r io.Reader) {
// no op
}
func (sh *ServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) {
func (sh *RPCServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) {
// no op
}
func (sh *ServerHandlers) OnPreWriteError(w io.Writer, err error) {
func (sh *RPCServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) {
// no op
}
func (sh *ServerHandlers) OnPostWriteError(w io.Writer, err error) {
func (sh *RPCServerHandlers) OnPreWriteError(w io.Writer, err error) {
// no op
}
func (sh *RPCServerHandlers) OnPostWriteError(w io.Writer, err error) {
// no op
}
func (sh *RPCServerHandlers) OnStop() {
// no op
}
func (sh *RPCServerHandlers) Validate() {
sh.ServerHandlers.Validate()
}

View File

@ -1,15 +1,21 @@
package server
import (
"git.loafle.net/commons_go/rpc"
cr "git.loafle.net/commons_go/rpc"
crpj "git.loafle.net/commons_go/rpc/protocol/json"
"git.loafle.net/commons_go/server"
"git.loafle.net/overflow/overflow_discovery/rpc"
)
func New(addr string, registry rpc.Registry) server.Server {
func New(addr string) server.Server {
rpcRegistry := cr.NewRegistry()
rpc.RegisterRPC(rpcRegistry)
sh := NewServerHandler(addr)
rpcSH := NewRPCServerHandler(registry)
sh.RPCServerHandler = rpcSH
rpcSH := newRPCServerHandler(rpcRegistry)
rpcSH.RegisterCodec(crpj.NewServerCodec(), crpj.Name)
sh := newServerHandler(addr, rpcSH)
s := server.New(sh)

7
server/server_handler.go Normal file
View File

@ -0,0 +1,7 @@
package server
import "git.loafle.net/commons_go/server"
type ServerHandler interface {
server.ServerHandler
}

View File

@ -1,15 +1,22 @@
package server
import (
"fmt"
"log"
"net"
"os"
"git.loafle.net/commons_go/rpc/server"
"git.loafle.net/commons_go/logging"
crs "git.loafle.net/commons_go/rpc/server"
"git.loafle.net/commons_go/server"
)
func NewServerHandler(addr string) *ServerHandlers {
sh := &ServerHandlers{}
sh.addr = addr
func newServerHandler(addr string, rpcSH RPCServerHandler) ServerHandler {
sh := &ServerHandlers{
addr: addr,
rpcSH: rpcSH,
}
sh.Name = "Discovery"
return sh
}
@ -17,26 +24,66 @@ func NewServerHandler(addr string) *ServerHandlers {
type ServerHandlers struct {
server.ServerHandlers
addr string
rpcSH RPCServerHandler
addr string
}
func (sh *ServerHandlers) Init() error {
return nil
}
func (sh *ServerHandlers) OnStart() {
// no op
}
func (sh *ServerHandlers) OnConnect(conn net.Conn) (net.Conn, error) {
var err error
if conn, err = sh.ServerHandlers.OnConnect(conn); nil != err {
return nil, err
}
return newConn(conn, "jsonrpc"), nil
}
func (sh *ServerHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) {
dConn := conn.(Conn)
contentType := dConn.GetContentType()
codec, err := sh.rpcSH.GetCodec(contentType)
if nil != err {
log.Printf("RPC Handle: %v", err)
doneChan <- struct{}{}
return
}
for {
if err := crs.Handle(sh.rpcSH, codec, conn, conn); nil != err {
if server.IsClientDisconnect(err) {
doneChan <- struct{}{}
return
}
log.Printf("RPC: %v", err)
}
select {
case <-stopChan:
return
default:
}
}
}
func (sh *ServerHandlers) OnStop() {
// no op
}
func (sh *ServerHandlers) Listen() (net.Listener, error) {
os.Remove(sh.addr)
l, err := net.ListenUnix("unix", &net.UnixAddr{Name: sh.addr, Net: "unix"})
if nil == err {
os.Chmod(sh.addr, 0777)
}
return l, err
}
func (sh *ServerHandlers) Validate() {
sh.ServerHandlers.Validate()
func (sh *ServerHandlers) OnAccept(conn net.Conn) (net.Conn, error) {
return conn, nil
if "" == sh.addr {
logging.Logger().Panic(fmt.Sprintf("Server: Address of server must be specified"))
}
if nil == sh.rpcSH {
logging.Logger().Panic(fmt.Sprintf("Server: RPC Server Handler must be specified"))
}
}

View File

@ -0,0 +1,15 @@
package server
import (
"net"
"os"
)
func (sh *ServerHandlers) Listen() (net.Listener, error) {
os.Remove(sh.addr)
l, err := net.ListenUnix("unix", &net.UnixAddr{Name: sh.addr, Net: "unix"})
if nil == err {
os.Chmod(sh.addr, 0777)
}
return l, err
}

View File

@ -0,0 +1,15 @@
package server
import (
"net"
"gopkg.in/natefinch/npipe.v2"
)
func (sh *ServerHandlers) Listen() (net.Listener, error) {
ln, err := npipe.Listen(`\\.\pipe\` + sh.addr)
if err != nil {
// handle error
}
return ln, err
}