This commit is contained in:
crusader 2017-10-27 15:01:09 +09:00
parent 97acdddce5
commit dc520fdb24
7 changed files with 73 additions and 78 deletions

91
main.go
View File

@ -1,75 +1,52 @@
package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
cRPC "git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol/json"
"git.loafle.net/overflow/overflow_discovery/rpc"
"git.loafle.net/overflow/overflow_discovery/server"
)
var (
sockFile *string
)
func init() {
sockFile = flag.String("sock", serverAddr, "Socket file")
flag.Parse()
}
func main() {
// s := local_socket.NewLocalServer("discovery")
// s.SetOnConnectionCallback(func(socket net.Conn) {
// defer socket.Close()
// var wg sync.WaitGroup
// wg.Add(2)
// go func() {
// writer := bufio.NewWriter(socket)
// count := rand.Intn(20)
// fmt.Printf("send %d items\n", count)
// for i := 0; i < count; i++ {
// fmt.Printf("write %d\n", i)
// fmt.Fprintf(writer, "%d\n", i)
// }
// writer.WriteString("end\n")
// writer.Flush()
// fmt.Println("done write")
// wg.Done()
// }()
// go func() {
// reader := bufio.NewReader(socket)
// counter := 1
// for {
// content, err := reader.ReadString('\n')
// if err != nil {
// fmt.Println(err)
// break
// }
// fmt.Printf("read %d: '%s'\n", counter, strings.TrimRight(content, "\n"))
// if content == "end\n" {
// break
// }
// counter++
// }
// fmt.Println("done read")
// wg.Done()
// }()
// wg.Wait()
// })
registry := cRPC.NewRegistry()
registry.RegisterCodec(json.NewCodec(), "json")
registry.RegisterService(new(rpc.DiscoveryService), "")
// stop := make(chan os.Signal)
// signal.Notify(stop, syscall.SIGINT)
// var wg sync.WaitGroup
// wg.Add(1)
// go func() {
// defer wg.Done()
// s.ListenAndServe()
// }()
s := server.New(*sockFile, registry)
// fmt.Printf("Serving Server at %s\n", s.Path())
// select {
// case signal := <-stop:
// fmt.Printf("Got signal: %v\n", signal)
// }
// fmt.Printf("Stopping listener\n")
// s.Close()
// fmt.Printf("Waiting on server\n")
// wg.Wait()
s := server.New("discovery")
stop := make(chan os.Signal)
signal.Notify(stop, syscall.SIGINT)
go func() {
if err := s.Serve(); err != nil {
log.Fatalf("Cannot start rpc server: %s", err)
return
}
}()
select {
case signal := <-stop:
fmt.Printf("Got signal: %v\n", signal)
}
s.Stop()
}
/**

3
main_unix.go Normal file
View File

@ -0,0 +1,3 @@
package main
var serverAddr string = "/tmp/discovery.sock"

3
main_windows.go Normal file
View File

@ -0,0 +1,3 @@
package main
var serverAddr string = "discovery"

25
rpc/discovery_service.go Normal file
View File

@ -0,0 +1,25 @@
package rpc
import (
"log"
)
type DiscoveryService struct {
}
type StartRequestParam struct {
Name string
Count int
}
type StartResponseParam struct {
Result int
}
func (ds *DiscoveryService) Start(req *StartRequestParam, res *StartResponseParam) error {
log.Printf("DiscoveryService.Start param: Name[%s] Count[%d]", req.Name, req.Count)
res.Result = 10
return nil
}

View File

@ -1,12 +0,0 @@
package rpc
import (
"log"
)
type DiscoveryService struct {
}
func (ds *DiscoveryService) Start() {
log.Print("DiscoveryService.Start")
}

View File

@ -2,16 +2,10 @@ package server
import (
"git.loafle.net/commons_go/rpc"
"git.loafle.net/commons_go/rpc/protocol/json"
"git.loafle.net/commons_go/server"
dRPC "git.loafle.net/overflow/overflow_discovery/server/rpc"
)
func New(addr string) server.Server {
registry := rpc.NewRegistry()
registry.RegisterCodec(json.NewCodec(), "json")
registry.RegisterService(new(dRPC.DiscoveryService), "")
func New(addr string, registry rpc.Registry) server.Server {
sh := NewServerHandler(addr, registry)
sh.workersChan = make(chan struct{}, 10)

View File

@ -26,12 +26,17 @@ type ServerHandlers struct {
func (sh *ServerHandlers) Handle(remoteAddr string, rwc io.ReadWriteCloser, stopChan chan struct{}) {
contentType := "json"
Loop:
for {
sh.registry.Invoke(contentType, rwc, rwc, nil, nil)
if err := sh.registry.Invoke(contentType, rwc, rwc, nil, nil); nil != err && sh.IsClientDisconnect(err) {
stopChan <- struct{}{}
break Loop
}
select {
case <-stopChan:
return
default:
}
}