This commit is contained in:
crusader 2017-10-31 19:27:26 +09:00
parent dc520fdb24
commit 9775e5ea4e
9 changed files with 202 additions and 10 deletions

34
client/client.go Normal file
View File

@ -0,0 +1,34 @@
package client
import (
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/server"
rpcClient "git.loafle.net/overflow/overflow_discovery/client/rpc"
)
func New(addr string, registry rpc.Registry) Client {
ch := NewClientHandler(addr, registry)
c := &client{}
c.Client = server.NewClient(ch)
c.ch = ch
return c
}
type Client interface {
server.Client
RPC() rpcClient.Client
}
type client struct {
server.Client
ch *ClientHandlers
}
func (c *client) RPC() rpcClient.Client {
return c.ch.RPCClient
}

47
client/client_handlers.go Normal file
View File

@ -0,0 +1,47 @@
package client
import (
"io"
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/server/ipc"
rpcClient "git.loafle.net/overflow/overflow_discovery/client/rpc"
)
func NewClientHandler(addr string, registry rpc.Registry) *ClientHandlers {
ch := &ClientHandlers{}
ch.Addr = addr
ch.RPCClient = rpcClient.New(registry)
return ch
}
type ClientHandlers struct {
ipc.ClientHandlers
RPCClient rpcClient.Client
}
func (ch *ClientHandlers) OnStart() {
// no op
}
func (ch *ClientHandlers) OnStop() {
// no op
}
func (ch *ClientHandlers) OnHandshake(remoteAddr string, rwc io.ReadWriteCloser) error {
return nil
}
func (ch *ClientHandlers) Handle(rwc io.ReadWriteCloser, stopChan chan struct{}) {
ch.RPCClient.Start(rwc)
select {
case <-stopChan:
rwc.Close()
return
default:
}
}

19
client/rpc/client.go Normal file
View File

@ -0,0 +1,19 @@
package rpc
import (
"git.loafle.net/commons_go/rpc"
rpcClient "git.loafle.net/commons_go/rpc/client"
)
func New(registry rpc.Registry) Client {
ch := NewClientHandler(registry)
c := rpcClient.New(ch)
return c
}
type Client interface {
rpcClient.Client
}

View File

@ -0,0 +1,28 @@
package rpc
import (
"git.loafle.net/commons_go/rpc"
rpcClient "git.loafle.net/commons_go/rpc/client"
"git.loafle.net/commons_go/rpc/protocol/json"
)
func NewClientHandler(registry rpc.Registry) *ClientHandlers {
ch := &ClientHandlers{}
ch.ContentType = "json"
ch.Codec = json.NewClientCodec()
ch.RPCRegistry = registry
return ch
}
type ClientHandlers struct {
rpcClient.ClientHandlers
}
func (ch *ClientHandlers) OnStart() {
// no op
}
func (ch *ClientHandlers) OnStop() {
// no op
}

View File

@ -9,7 +9,6 @@ import (
"syscall" "syscall"
cRPC "git.loafle.net/commons_go/rpc" cRPC "git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol/json"
"git.loafle.net/overflow/overflow_discovery/rpc" "git.loafle.net/overflow/overflow_discovery/rpc"
"git.loafle.net/overflow/overflow_discovery/server" "git.loafle.net/overflow/overflow_discovery/server"
) )
@ -26,7 +25,6 @@ func init() {
func main() { func main() {
registry := cRPC.NewRegistry() registry := cRPC.NewRegistry()
registry.RegisterCodec(json.NewCodec(), "json")
registry.RegisterService(new(rpc.DiscoveryService), "") registry.RegisterService(new(rpc.DiscoveryService), "")
s := server.New(*sockFile, registry) s := server.New(*sockFile, registry)

21
server/rpc/server.go Normal file
View File

@ -0,0 +1,21 @@
package rpc
import (
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol/json"
rpcServer "git.loafle.net/commons_go/rpc/server"
)
func New(registry rpc.Registry) Server {
sh := NewServerHandler(registry)
sh.RegisterCodec(json.NewServerCodec(), "json")
s := rpcServer.New(sh)
return s
}
type Server interface {
rpcServer.Server
}

View File

@ -0,0 +1,47 @@
package rpc
import (
"io"
"git.loafle.net/commons_go/rpc"
rpcServer "git.loafle.net/commons_go/rpc/server"
)
func NewServerHandler(registry rpc.Registry) *ServerHandlers {
sh := &ServerHandlers{}
sh.RPCRegistry = registry
return sh
}
type ServerHandlers struct {
rpcServer.ServerHandlers
}
func (sh *ServerHandlers) GetContentType(r io.Reader) string {
return "json"
}
func (sh *ServerHandlers) OnPreRead(r io.Reader) {
// no op
}
func (sh *ServerHandlers) OnPostRead(r io.Reader) {
// no op
}
func (sh *ServerHandlers) OnPreWriteResult(w io.Writer, result interface{}) {
// no op
}
func (sh *ServerHandlers) OnPostWriteResult(w io.Writer, result interface{}) {
// no op
}
func (sh *ServerHandlers) OnPreWriteError(w io.Writer, err error) {
// no op
}
func (sh *ServerHandlers) OnPostWriteError(w io.Writer, err error) {
// no op
}

View File

@ -8,7 +8,6 @@ import (
func New(addr string, registry rpc.Registry) server.Server { func New(addr string, registry rpc.Registry) server.Server {
sh := NewServerHandler(addr, registry) sh := NewServerHandler(addr, registry)
sh.workersChan = make(chan struct{}, 10)
s := server.NewServer(sh) s := server.NewServer(sh)

View File

@ -5,13 +5,13 @@ import (
"git.loafle.net/commons_go/rpc" "git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/server/ipc" "git.loafle.net/commons_go/server/ipc"
rpcServer "git.loafle.net/overflow/overflow_discovery/server/rpc"
) )
func NewServerHandler(addr string, registry rpc.Registry) *ServerHandlers { func NewServerHandler(addr string, registry rpc.Registry) *ServerHandlers {
sh := &ServerHandlers{ sh := &ServerHandlers{}
registry: registry,
}
sh.Addr = addr sh.Addr = addr
sh.rpcServer = rpcServer.New(registry)
return sh return sh
} }
@ -19,22 +19,21 @@ func NewServerHandler(addr string, registry rpc.Registry) *ServerHandlers {
type ServerHandlers struct { type ServerHandlers struct {
ipc.ServerHandlers ipc.ServerHandlers
registry rpc.Registry rpcServer rpcServer.Server
workersChan chan struct{}
} }
func (sh *ServerHandlers) Handle(remoteAddr string, rwc io.ReadWriteCloser, stopChan chan struct{}) { func (sh *ServerHandlers) Handle(remoteAddr string, rwc io.ReadWriteCloser, stopChan chan struct{}) {
contentType := "json"
Loop: Loop:
for { for {
if err := sh.registry.Invoke(contentType, rwc, rwc, nil, nil); nil != err && sh.IsClientDisconnect(err) { if err := sh.rpcServer.Handle(rwc, rwc); nil != err && sh.IsClientDisconnect(err) {
stopChan <- struct{}{} stopChan <- struct{}{}
break Loop break Loop
} }
select { select {
case <-stopChan: case <-stopChan:
rwc.Close()
return return
default: default:
} }