diff --git a/main.go b/main.go index 4b1da6e..2dcca35 100644 --- a/main.go +++ b/main.go @@ -1,75 +1,52 @@ package main import ( + "flag" + "fmt" "log" + "os" + "os/signal" + "syscall" + cRPC "git.loafle.net/commons_go/rpc" + "git.loafle.net/commons_go/rpc/protocol/json" + "git.loafle.net/overflow/overflow_discovery/rpc" "git.loafle.net/overflow/overflow_discovery/server" ) +var ( + sockFile *string +) + +func init() { + sockFile = flag.String("sock", serverAddr, "Socket file") + flag.Parse() + +} + func main() { - // s := local_socket.NewLocalServer("discovery") - // s.SetOnConnectionCallback(func(socket net.Conn) { - // defer socket.Close() - // var wg sync.WaitGroup - // wg.Add(2) - // go func() { - // writer := bufio.NewWriter(socket) - // count := rand.Intn(20) - // fmt.Printf("send %d items\n", count) - // for i := 0; i < count; i++ { - // fmt.Printf("write %d\n", i) - // fmt.Fprintf(writer, "%d\n", i) - // } - // writer.WriteString("end\n") - // writer.Flush() - // fmt.Println("done write") - // wg.Done() - // }() - // go func() { - // reader := bufio.NewReader(socket) - // counter := 1 - // for { - // content, err := reader.ReadString('\n') - // if err != nil { - // fmt.Println(err) - // break - // } - // fmt.Printf("read %d: '%s'\n", counter, strings.TrimRight(content, "\n")) - // if content == "end\n" { - // break - // } - // counter++ - // } - // fmt.Println("done read") - // wg.Done() - // }() - // wg.Wait() - // }) + registry := cRPC.NewRegistry() + registry.RegisterCodec(json.NewCodec(), "json") + registry.RegisterService(new(rpc.DiscoveryService), "") - // stop := make(chan os.Signal) - // signal.Notify(stop, syscall.SIGINT) - // var wg sync.WaitGroup - // wg.Add(1) - // go func() { - // defer wg.Done() - // s.ListenAndServe() - // }() + s := server.New(*sockFile, registry) - // fmt.Printf("Serving Server at %s\n", s.Path()) - // select { - // case signal := <-stop: - // fmt.Printf("Got signal: %v\n", signal) - // } - // fmt.Printf("Stopping listener\n") - // s.Close() - // fmt.Printf("Waiting on server\n") - // wg.Wait() + stop := make(chan os.Signal) + signal.Notify(stop, syscall.SIGINT) - s := server.New("discovery") + go func() { + if err := s.Serve(); err != nil { + log.Fatalf("Cannot start rpc server: %s", err) + return + } + }() - if err := s.Serve(); err != nil { - log.Fatalf("Cannot start rpc server: %s", err) + select { + case signal := <-stop: + fmt.Printf("Got signal: %v\n", signal) } + s.Stop() + } /** diff --git a/main_unix.go b/main_unix.go new file mode 100644 index 0000000..0bcdf6a --- /dev/null +++ b/main_unix.go @@ -0,0 +1,3 @@ +package main + +var serverAddr string = "/tmp/discovery.sock" diff --git a/main_windows.go b/main_windows.go new file mode 100644 index 0000000..d6c6339 --- /dev/null +++ b/main_windows.go @@ -0,0 +1,3 @@ +package main + +var serverAddr string = "discovery" diff --git a/rpc/discovery_service.go b/rpc/discovery_service.go new file mode 100644 index 0000000..cba92de --- /dev/null +++ b/rpc/discovery_service.go @@ -0,0 +1,25 @@ +package rpc + +import ( + "log" +) + +type DiscoveryService struct { +} + +type StartRequestParam struct { + Name string + Count int +} + +type StartResponseParam struct { + Result int +} + +func (ds *DiscoveryService) Start(req *StartRequestParam, res *StartResponseParam) error { + log.Printf("DiscoveryService.Start param: Name[%s] Count[%d]", req.Name, req.Count) + + res.Result = 10 + + return nil +} diff --git a/server/rpc/discovery_service.go b/server/rpc/discovery_service.go deleted file mode 100644 index f6a009d..0000000 --- a/server/rpc/discovery_service.go +++ /dev/null @@ -1,12 +0,0 @@ -package rpc - -import ( - "log" -) - -type DiscoveryService struct { -} - -func (ds *DiscoveryService) Start() { - log.Print("DiscoveryService.Start") -} diff --git a/server/server.go b/server/server.go index a4b6cbc..2cc0589 100644 --- a/server/server.go +++ b/server/server.go @@ -2,16 +2,10 @@ package server import ( "git.loafle.net/commons_go/rpc" - "git.loafle.net/commons_go/rpc/protocol/json" "git.loafle.net/commons_go/server" - - dRPC "git.loafle.net/overflow/overflow_discovery/server/rpc" ) -func New(addr string) server.Server { - registry := rpc.NewRegistry() - registry.RegisterCodec(json.NewCodec(), "json") - registry.RegisterService(new(dRPC.DiscoveryService), "") +func New(addr string, registry rpc.Registry) server.Server { sh := NewServerHandler(addr, registry) sh.workersChan = make(chan struct{}, 10) diff --git a/server/server_handlers.go b/server/server_handlers.go index 3dd02fe..77172f3 100644 --- a/server/server_handlers.go +++ b/server/server_handlers.go @@ -26,12 +26,17 @@ type ServerHandlers struct { func (sh *ServerHandlers) Handle(remoteAddr string, rwc io.ReadWriteCloser, stopChan chan struct{}) { contentType := "json" +Loop: for { - sh.registry.Invoke(contentType, rwc, rwc, nil, nil) + if err := sh.registry.Invoke(contentType, rwc, rwc, nil, nil); nil != err && sh.IsClientDisconnect(err) { + stopChan <- struct{}{} + break Loop + } select { case <-stopChan: return + default: } }