rpc/server/server.go

90 lines
1.5 KiB
Go
Raw Normal View History

2017-10-31 09:25:44 +00:00
package server
import (
"io"
"sync"
"git.loafle.net/commons_go/rpc/protocol"
)
func New(sh ServerHandler) Server {
s := &server{
sh: sh,
}
return s
}
type Server interface {
Start()
Stop()
Handle(r io.Reader, w io.Writer) error
}
type server struct {
sh ServerHandler
stopChan chan struct{}
stopWg sync.WaitGroup
}
func (s *server) Start() {
if nil == s.sh {
panic("Server: server handler must be specified.")
}
s.sh.Validate()
if s.stopChan != nil {
panic("Server: server is already running. Stop it before starting it again")
}
s.stopChan = make(chan struct{})
}
func (s *server) Stop() {
if s.stopChan == nil {
panic("Server: server must be started before stopping it")
}
close(s.stopChan)
s.stopWg.Wait()
s.stopChan = nil
}
func (s *server) Handle(r io.Reader, w io.Writer) error {
contentType := s.sh.GetContentType(r)
codec, err := s.sh.getCodec(contentType)
if nil != err {
return err
}
var codecReq protocol.ServerCodecRequest
defer func() {
if nil != codecReq {
codecReq.Complete()
}
}()
s.sh.OnPreRead(r)
// Create a new codec request.
codecReq, errNew := codec.NewRequest(r)
if nil != errNew {
return errNew
}
s.sh.OnPostRead(r)
result, err := s.sh.invoke(codecReq)
if nil != err {
s.sh.OnPreWriteError(w, err)
codecReq.WriteError(w, 400, err)
s.sh.OnPostWriteError(w, err)
return nil
}
s.sh.OnPreWriteResult(w, result)
codecReq.WriteResponse(w, result)
s.sh.OnPostWriteResult(w, result)
return nil
}