ing
This commit is contained in:
parent
78c2f69893
commit
fda60823d6
|
@ -1,13 +0,0 @@
|
||||||
package ipc
|
|
||||||
|
|
||||||
import "git.loafle.net/commons_go/rpc"
|
|
||||||
|
|
||||||
type IPCAdapter struct {
|
|
||||||
registry rpc.Registry
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewAdapter(registry rpc.Registry) *IPCAdapter {
|
|
||||||
return &IPCAdapter{
|
|
||||||
registry: registry,
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,14 +0,0 @@
|
||||||
package ipc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.loafle.net/commons_go/rpc"
|
|
||||||
"git.loafle.net/commons_go/server"
|
|
||||||
)
|
|
||||||
|
|
||||||
func NewServer(addr string, registry rpc.Registry) server.Server {
|
|
||||||
sh := NewServerHandler(addr, registry)
|
|
||||||
|
|
||||||
s := server.NewServer(sh)
|
|
||||||
|
|
||||||
return s
|
|
||||||
}
|
|
|
@ -1,32 +0,0 @@
|
||||||
package ipc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
|
|
||||||
"git.loafle.net/commons_go/rpc"
|
|
||||||
"git.loafle.net/commons_go/server"
|
|
||||||
"git.loafle.net/commons_go/server/ipc"
|
|
||||||
)
|
|
||||||
|
|
||||||
func NewServerHandler(addr string, registry rpc.Registry) ServerHandler {
|
|
||||||
sh := &ServerHandlers{
|
|
||||||
registry: registry,
|
|
||||||
}
|
|
||||||
sh.Addr = addr
|
|
||||||
|
|
||||||
return sh
|
|
||||||
}
|
|
||||||
|
|
||||||
type ServerHandler interface {
|
|
||||||
server.ServerHandler
|
|
||||||
}
|
|
||||||
|
|
||||||
type ServerHandlers struct {
|
|
||||||
ipc.ServerHandlers
|
|
||||||
|
|
||||||
registry rpc.Registry
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sh *ServerHandlers) Handle(remoteAddr string, rwc io.ReadWriteCloser, stopChan chan struct{}) {
|
|
||||||
|
|
||||||
}
|
|
106
server/server.go
106
server/server.go
|
@ -1,89 +1,67 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"log"
|
||||||
"sync"
|
"net"
|
||||||
|
|
||||||
"git.loafle.net/commons_go/rpc/protocol"
|
"git.loafle.net/commons_go/rpc/protocol"
|
||||||
|
"git.loafle.net/commons_go/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
func New(sh ServerHandler) Server {
|
func New(sh ServerHandler) Server {
|
||||||
s := &server{
|
lsh := &serverHandlers{}
|
||||||
sh: sh,
|
lsh.ServerHandler = sh
|
||||||
}
|
lsh.lsh = sh
|
||||||
|
s := server.NewServer(lsh)
|
||||||
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
type Server interface {
|
type serverHandlers struct {
|
||||||
Start()
|
server.ServerHandler
|
||||||
Stop()
|
lsh ServerHandler
|
||||||
Handle(r io.Reader, w io.Writer) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type server struct {
|
func (sh *serverHandlers) Handle(conn net.Conn, stopChan <-chan struct{}, doneChan chan<- struct{}) {
|
||||||
sh ServerHandler
|
contentType := sh.lsh.GetContentType(conn)
|
||||||
|
codec, err := sh.lsh.getCodec(contentType)
|
||||||
stopChan chan struct{}
|
|
||||||
stopWg sync.WaitGroup
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) Start() {
|
|
||||||
if nil == s.sh {
|
|
||||||
panic("Server: server handler must be specified.")
|
|
||||||
}
|
|
||||||
s.sh.Validate()
|
|
||||||
|
|
||||||
if s.stopChan != nil {
|
|
||||||
panic("Server: server is already running. Stop it before starting it again")
|
|
||||||
}
|
|
||||||
s.stopChan = make(chan struct{})
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) Stop() {
|
|
||||||
if s.stopChan == nil {
|
|
||||||
panic("Server: server must be started before stopping it")
|
|
||||||
}
|
|
||||||
close(s.stopChan)
|
|
||||||
s.stopWg.Wait()
|
|
||||||
s.stopChan = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *server) Handle(r io.Reader, w io.Writer) error {
|
|
||||||
contentType := s.sh.GetContentType(r)
|
|
||||||
codec, err := s.sh.getCodec(contentType)
|
|
||||||
if nil != err {
|
if nil != err {
|
||||||
return err
|
log.Printf("RPC Handle: %v", err)
|
||||||
|
doneChan <- struct{}{}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var codecReq protocol.ServerCodecRequest
|
var codecReq protocol.ServerCodecRequest
|
||||||
|
|
||||||
defer func() {
|
Loop:
|
||||||
if nil != codecReq {
|
for {
|
||||||
codecReq.Complete()
|
sh.lsh.OnPreRead(conn)
|
||||||
|
// Create a new codec request.
|
||||||
|
codecReq, errNew := codec.NewRequest(conn)
|
||||||
|
if nil != errNew {
|
||||||
|
log.Printf("RPC Handle: %v", errNew)
|
||||||
|
doneChan <- struct{}{}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}()
|
sh.lsh.OnPostRead(conn)
|
||||||
|
|
||||||
s.sh.OnPreRead(r)
|
result, err := sh.lsh.invoke(codecReq)
|
||||||
// Create a new codec request.
|
|
||||||
codecReq, errNew := codec.NewRequest(r)
|
|
||||||
if nil != errNew {
|
|
||||||
return errNew
|
|
||||||
}
|
|
||||||
s.sh.OnPostRead(r)
|
|
||||||
|
|
||||||
result, err := s.sh.invoke(codecReq)
|
if nil != err {
|
||||||
|
sh.lsh.OnPreWriteError(conn, err)
|
||||||
|
codecReq.WriteError(conn, 400, err)
|
||||||
|
sh.lsh.OnPostWriteError(conn, err)
|
||||||
|
} else {
|
||||||
|
sh.lsh.OnPreWriteResult(conn, result)
|
||||||
|
codecReq.WriteResponse(conn, result)
|
||||||
|
sh.lsh.OnPostWriteResult(conn, result)
|
||||||
|
}
|
||||||
|
|
||||||
if nil != err {
|
select {
|
||||||
s.sh.OnPreWriteError(w, err)
|
case <-stopChan:
|
||||||
codecReq.WriteError(w, 400, err)
|
return
|
||||||
s.sh.OnPostWriteError(w, err)
|
default:
|
||||||
return nil
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.sh.OnPreWriteResult(w, result)
|
|
||||||
codecReq.WriteResponse(w, result)
|
|
||||||
s.sh.OnPostWriteResult(w, result)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,12 +4,13 @@ import (
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"git.loafle.net/commons_go/rpc/protocol"
|
"git.loafle.net/commons_go/rpc/protocol"
|
||||||
|
"git.loafle.net/commons_go/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ServerHandler interface {
|
type ServerHandler interface {
|
||||||
RegisterCodec(codec protocol.ServerCodec, contentType string)
|
server.ServerHandler
|
||||||
|
|
||||||
GetContentType(r io.Reader) string
|
GetContentType(r io.Reader) string
|
||||||
|
RegisterCodec(codec protocol.ServerCodec, contentType string)
|
||||||
|
|
||||||
OnPreRead(r io.Reader)
|
OnPreRead(r io.Reader)
|
||||||
OnPostRead(r io.Reader)
|
OnPostRead(r io.Reader)
|
||||||
|
@ -22,6 +23,4 @@ type ServerHandler interface {
|
||||||
|
|
||||||
getCodec(contentType string) (protocol.ServerCodec, error)
|
getCodec(contentType string) (protocol.ServerCodec, error)
|
||||||
invoke(codec protocol.RegistryCodec) (result interface{}, err error)
|
invoke(codec protocol.RegistryCodec) (result interface{}, err error)
|
||||||
|
|
||||||
Validate()
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,12 +7,14 @@ import (
|
||||||
|
|
||||||
"git.loafle.net/commons_go/rpc"
|
"git.loafle.net/commons_go/rpc"
|
||||||
"git.loafle.net/commons_go/rpc/protocol"
|
"git.loafle.net/commons_go/rpc/protocol"
|
||||||
|
"git.loafle.net/commons_go/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ServerHandlers struct {
|
type ServerHandlers struct {
|
||||||
RPCRegistry rpc.Registry
|
server.ServerHandlers
|
||||||
|
|
||||||
codecs map[string]protocol.ServerCodec
|
RPCRegistry rpc.Registry
|
||||||
|
codecs map[string]protocol.ServerCodec
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterCodec adds a new codec to the server.
|
// RegisterCodec adds a new codec to the server.
|
||||||
|
@ -56,6 +58,8 @@ func (sh *ServerHandlers) OnPostWriteError(w io.Writer, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sh *ServerHandlers) Validate() {
|
func (sh *ServerHandlers) Validate() {
|
||||||
|
sh.ServerHandlers.Validate()
|
||||||
|
|
||||||
if nil == sh.RPCRegistry {
|
if nil == sh.RPCRegistry {
|
||||||
panic("RPCRegistry must be specified.")
|
panic("RPCRegistry must be specified.")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user