package server import ( "net" "sync" cRPC "git.loafle.net/commons_go/rpc" crsrs "git.loafle.net/commons_go/rpc/server/rwc/socket" "git.loafle.net/commons_go/rpc/protocol/json" "git.loafle.net/commons_go/server" ) func newSocketHandler(rpcSH RPCServletHandler) SocketHandler { rpcRWCSH := crsrs.New() sh := &SocketHandlers{ rpcSH: rpcSH, rpcRWCSH: rpcRWCSH, } return sh } type SocketHandlers struct { server.SocketHandlers rpcRWCSH cRPC.ServletReadWriteCloseHandler rpcSH RPCServletHandler } func (sh *SocketHandlers) Init(serverCTX server.ServerContext) error { if err := sh.SocketHandlers.Init(serverCTX); nil != err { return err } return nil } func (sh *SocketHandlers) Handshake(socketCTX server.SocketContext, conn net.Conn) (id string) { return "discovery" } func (sh *SocketHandlers) OnConnect(soc server.Socket) { sh.SocketHandlers.OnConnect(soc) soc.Context().SetAttribute(cRPC.ContentTypeKey, json.Name) } func (sh *SocketHandlers) Handle(soc server.Socket, stopChan <-chan struct{}, doneChan chan<- error) { var err error rpcServlet := retainRPCServlet(sh.rpcSH, sh.rpcRWCSH) defer func() { releaseRPCServlet(rpcServlet) doneChan <- err }() rpcDoneChan := make(chan error, 1) if err = rpcServlet.Start(soc.Context(), soc, rpcDoneChan); nil != err { return } select { case err = <-rpcDoneChan: case <-stopChan: rpcServlet.Stop() <-rpcDoneChan } } func (sh *SocketHandlers) OnDisconnect(soc server.Socket) { sh.SocketHandlers.OnDisconnect(soc) } func (sh *SocketHandlers) Destroy() { sh.SocketHandlers.Destroy() } func (sh *SocketHandlers) Validate() { } var rpcServletPool sync.Pool func retainRPCServlet(sh RPCServletHandler, rpcRWCSH cRPC.ServletReadWriteCloseHandler) cRPC.Servlet { v := rpcServletPool.Get() if v == nil { return cRPC.NewServlet(sh, rpcRWCSH) } return v.(cRPC.Servlet) } func releaseRPCServlet(s cRPC.Servlet) { rpcServletPool.Put(s) }