From 9775e5ea4e52f02c2ea85a26e75e12cb667c8a65 Mon Sep 17 00:00:00 2001 From: crusader Date: Tue, 31 Oct 2017 19:27:26 +0900 Subject: [PATCH] ing --- client/client.go | 34 +++++++++++++++++++++++++ client/client_handlers.go | 47 +++++++++++++++++++++++++++++++++++ client/rpc/client.go | 19 ++++++++++++++ client/rpc/client_handlers.go | 28 +++++++++++++++++++++ main.go | 2 -- server/rpc/server.go | 21 ++++++++++++++++ server/rpc/server_handlers.go | 47 +++++++++++++++++++++++++++++++++++ server/server.go | 1 - server/server_handlers.go | 13 +++++----- 9 files changed, 202 insertions(+), 10 deletions(-) create mode 100644 client/client.go create mode 100644 client/client_handlers.go create mode 100644 client/rpc/client.go create mode 100644 client/rpc/client_handlers.go create mode 100644 server/rpc/server.go create mode 100644 server/rpc/server_handlers.go diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..4978c46 --- /dev/null +++ b/client/client.go @@ -0,0 +1,34 @@ +package client + +import ( + "git.loafle.net/commons_go/rpc" + "git.loafle.net/commons_go/server" + rpcClient "git.loafle.net/overflow/overflow_discovery/client/rpc" +) + +func New(addr string, registry rpc.Registry) Client { + + ch := NewClientHandler(addr, registry) + + c := &client{} + c.Client = server.NewClient(ch) + c.ch = ch + + return c +} + +type Client interface { + server.Client + + RPC() rpcClient.Client +} + +type client struct { + server.Client + + ch *ClientHandlers +} + +func (c *client) RPC() rpcClient.Client { + return c.ch.RPCClient +} diff --git a/client/client_handlers.go b/client/client_handlers.go new file mode 100644 index 0000000..5956b27 --- /dev/null +++ b/client/client_handlers.go @@ -0,0 +1,47 @@ +package client + +import ( + "io" + + "git.loafle.net/commons_go/rpc" + "git.loafle.net/commons_go/server/ipc" + rpcClient "git.loafle.net/overflow/overflow_discovery/client/rpc" +) + +func NewClientHandler(addr string, registry rpc.Registry) *ClientHandlers { + ch := &ClientHandlers{} + ch.Addr = addr + ch.RPCClient = rpcClient.New(registry) + + return ch +} + +type ClientHandlers struct { + ipc.ClientHandlers + + RPCClient rpcClient.Client +} + +func (ch *ClientHandlers) OnStart() { + // no op +} + +func (ch *ClientHandlers) OnStop() { + // no op +} + +func (ch *ClientHandlers) OnHandshake(remoteAddr string, rwc io.ReadWriteCloser) error { + return nil +} + +func (ch *ClientHandlers) Handle(rwc io.ReadWriteCloser, stopChan chan struct{}) { + ch.RPCClient.Start(rwc) + + select { + case <-stopChan: + rwc.Close() + return + default: + } + +} diff --git a/client/rpc/client.go b/client/rpc/client.go new file mode 100644 index 0000000..69a70da --- /dev/null +++ b/client/rpc/client.go @@ -0,0 +1,19 @@ +package rpc + +import ( + "git.loafle.net/commons_go/rpc" + rpcClient "git.loafle.net/commons_go/rpc/client" +) + +func New(registry rpc.Registry) Client { + + ch := NewClientHandler(registry) + + c := rpcClient.New(ch) + + return c +} + +type Client interface { + rpcClient.Client +} diff --git a/client/rpc/client_handlers.go b/client/rpc/client_handlers.go new file mode 100644 index 0000000..04f7bbc --- /dev/null +++ b/client/rpc/client_handlers.go @@ -0,0 +1,28 @@ +package rpc + +import ( + "git.loafle.net/commons_go/rpc" + rpcClient "git.loafle.net/commons_go/rpc/client" + "git.loafle.net/commons_go/rpc/protocol/json" +) + +func NewClientHandler(registry rpc.Registry) *ClientHandlers { + ch := &ClientHandlers{} + ch.ContentType = "json" + ch.Codec = json.NewClientCodec() + ch.RPCRegistry = registry + + return ch +} + +type ClientHandlers struct { + rpcClient.ClientHandlers +} + +func (ch *ClientHandlers) OnStart() { + // no op +} + +func (ch *ClientHandlers) OnStop() { + // no op +} diff --git a/main.go b/main.go index 2dcca35..0f3f780 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,6 @@ import ( "syscall" cRPC "git.loafle.net/commons_go/rpc" - "git.loafle.net/commons_go/rpc/protocol/json" "git.loafle.net/overflow/overflow_discovery/rpc" "git.loafle.net/overflow/overflow_discovery/server" ) @@ -26,7 +25,6 @@ func init() { func main() { registry := cRPC.NewRegistry() - registry.RegisterCodec(json.NewCodec(), "json") registry.RegisterService(new(rpc.DiscoveryService), "") s := server.New(*sockFile, registry) diff --git a/server/rpc/server.go b/server/rpc/server.go new file mode 100644 index 0000000..4c45124 --- /dev/null +++ b/server/rpc/server.go @@ -0,0 +1,21 @@ +package rpc + +import ( + "git.loafle.net/commons_go/rpc" + "git.loafle.net/commons_go/rpc/protocol/json" + rpcServer "git.loafle.net/commons_go/rpc/server" +) + +func New(registry rpc.Registry) Server { + + sh := NewServerHandler(registry) + sh.RegisterCodec(json.NewServerCodec(), "json") + + s := rpcServer.New(sh) + + return s +} + +type Server interface { + rpcServer.Server +} diff --git a/server/rpc/server_handlers.go b/server/rpc/server_handlers.go new file mode 100644 index 0000000..5911948 --- /dev/null +++ b/server/rpc/server_handlers.go @@ -0,0 +1,47 @@ +package rpc + +import ( + "io" + + "git.loafle.net/commons_go/rpc" + rpcServer "git.loafle.net/commons_go/rpc/server" +) + +func NewServerHandler(registry rpc.Registry) *ServerHandlers { + sh := &ServerHandlers{} + sh.RPCRegistry = registry + + return sh +} + +type ServerHandlers struct { + rpcServer.ServerHandlers +} + +func (sh *ServerHandlers) GetContentType(r io.Reader) string { + return "json" +} + +func (sh *ServerHandlers) OnPreRead(r io.Reader) { + // no op +} + +func (sh *ServerHandlers) OnPostRead(r io.Reader) { + // no op +} + +func (sh *ServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) { + // no op +} + +func (sh *ServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) { + // no op +} + +func (sh *ServerHandlers) OnPreWriteError(w io.Writer, err error) { + // no op +} + +func (sh *ServerHandlers) OnPostWriteError(w io.Writer, err error) { + // no op +} diff --git a/server/server.go b/server/server.go index 2cc0589..860006d 100644 --- a/server/server.go +++ b/server/server.go @@ -8,7 +8,6 @@ import ( func New(addr string, registry rpc.Registry) server.Server { sh := NewServerHandler(addr, registry) - sh.workersChan = make(chan struct{}, 10) s := server.NewServer(sh) diff --git a/server/server_handlers.go b/server/server_handlers.go index 77172f3..0aaac69 100644 --- a/server/server_handlers.go +++ b/server/server_handlers.go @@ -5,13 +5,13 @@ import ( "git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/server/ipc" + rpcServer "git.loafle.net/overflow/overflow_discovery/server/rpc" ) func NewServerHandler(addr string, registry rpc.Registry) *ServerHandlers { - sh := &ServerHandlers{ - registry: registry, - } + sh := &ServerHandlers{} sh.Addr = addr + sh.rpcServer = rpcServer.New(registry) return sh } @@ -19,22 +19,21 @@ func NewServerHandler(addr string, registry rpc.Registry) *ServerHandlers { type ServerHandlers struct { ipc.ServerHandlers - registry rpc.Registry - workersChan chan struct{} + rpcServer rpcServer.Server } func (sh *ServerHandlers) Handle(remoteAddr string, rwc io.ReadWriteCloser, stopChan chan struct{}) { - contentType := "json" Loop: for { - if err := sh.registry.Invoke(contentType, rwc, rwc, nil, nil); nil != err && sh.IsClientDisconnect(err) { + if err := sh.rpcServer.Handle(rwc, rwc); nil != err && sh.IsClientDisconnect(err) { stopChan <- struct{}{} break Loop } select { case <-stopChan: + rwc.Close() return default: }