commit b63d063fd3980f2229c3486d461ce69a09458e0e Author: jackdaw Date: Mon Nov 28 14:13:15 2016 +0900 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e7e5af6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,72 @@ +# Created by .ignore support plugin (hsz.mobi) +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/workspace.xml +.idea/tasks.xml +.idea/dictionaries +.idea/vcs.xml +.idea/jsLibraryMappings.xml + +# Sensitive or high-churn files: +.idea/dataSources.ids +.idea/dataSources.xml +.idea/dataSources.local.xml +.idea/sqlDataSources.xml +.idea/dynamic.xml +.idea/uiDesigner.xml + +# Gradle: +.idea/gradle.xml +.idea/libraries + +# Mongo Explorer plugin: +.idea/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties +### Go template +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof + diff --git a/communicate.go b/communicate.go new file mode 100644 index 0000000..32b7aed --- /dev/null +++ b/communicate.go @@ -0,0 +1,18 @@ +package communicate + +import ( + "fmt" + "loafle.com/commons/communicate/events" +) + +var _c *communicator = nil + +func init() { + fmt.Println("init communicator") + _c = NewCommunicator() + _c.start() +} + +func Send(e *events.Event) { + _c.addEvent(e) +} diff --git a/communicate_test.go b/communicate_test.go new file mode 100644 index 0000000..cc3ec4c --- /dev/null +++ b/communicate_test.go @@ -0,0 +1,80 @@ +package communicate + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/stretchr/testify/assert" + "gopkg.in/gin-gonic/gin.v1" + "io/ioutil" + "loafle.com/commons/communicate/events" + "net/http" + "net/http/httptest" + "testing" +) + +func TestCommunicatorInit(t *testing.T) { + assert.NotNil(t, _c) +} + +func makeGin() *gin.Engine { + r := gin.New() + api := r.Group("/_api") + { + collector := api.Group("/collector") + { + event := collector.Group("/event") + { + { + types := event.Group("/status") + { + types.POST("/:type", func(c *gin.Context) { + fmt.Println("called /_api/collector/event/status/:type") + var j events.Event + c.BindJSON(&j) + fmt.Println(j) + c.JSON(http.StatusOK, gin.H{"status": "ok"}) + }) + + } + } + } + } + } + return r +} + +func TestSend(t *testing.T) { + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + })) + defer ts.Close() + for i := 0; i < 10; i++ { + e := events.NewEvent(events.CENTRAL_EVENT, events.InstallEvent{Message: "Test"}) + Send(e) + t.Log(e.GetResult()) + } +} + +func TestRealSendByGin(t *testing.T) { + + e := events.NewEvent(events.CENTRAL_EVENT, events.NewInstallEvent("TestInstallEvent")) + data, _ := json.Marshal(&e) + + var u events.URLMaker + u = e.Data.(events.URLMaker) + t.Log(u.GetUrl()) + req := httptest.NewRequest("POST", u.GetUrl(), bytes.NewReader(data)) + w := httptest.NewRecorder() + + g := makeGin() + g.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Home page didn't return %v", http.StatusOK) + } else { + t.Log("OKOKOK") + data, _ := ioutil.ReadAll(w.Body) + t.Log(string(data)) + } +} diff --git a/communicator.go b/communicator.go new file mode 100644 index 0000000..edfbc0b --- /dev/null +++ b/communicator.go @@ -0,0 +1,33 @@ +package communicate + +import ( + "bytes" + "encoding/json" + "loafle.com/commons/communicate/events" + "net/http" +) + +type communicator struct { + Queue chan *events.Event +} + +func NewCommunicator() *communicator { + return &communicator{Queue: make(chan *events.Event, 10)} +} + +func (c *communicator) addEvent(e *events.Event) { + c.Queue <- e +} + +func (c *communicator) start() { + go func() { + for e := range c.Queue { + go func(event *events.Event) { + m := event.Data.(events.URLMaker) + data, _ := json.Marshal(event) + r, _ := http.Post(m.GetUrl(), "application/json", bytes.NewBuffer(data)) + e.Result <- events.Result{Message: r.Status} + }(e) + } + }() +} diff --git a/communicator_test.go b/communicator_test.go new file mode 100644 index 0000000..b8c6bb6 --- /dev/null +++ b/communicator_test.go @@ -0,0 +1 @@ +package communicate diff --git a/events/collector.go b/events/collector.go new file mode 100644 index 0000000..aa73516 --- /dev/null +++ b/events/collector.go @@ -0,0 +1,13 @@ +package events + +type InstallEvent struct { + Message string +} + +func (i InstallEvent) GetUrl() string { + return COLLECTOR_INSTALL +} + +func NewInstallEvent(message string) *InstallEvent { + return &InstallEvent{Message: message} +} diff --git a/events/event.go b/events/event.go new file mode 100644 index 0000000..13b805e --- /dev/null +++ b/events/event.go @@ -0,0 +1,36 @@ +package events + +const ( + CENTRAL_POLLING = "P" + CENTRAL_EVENT = "E" +) + +const ( + COLLECTOR_INSTALL = "http://localhost:8080/_api/collector/event/status/install" +) + +type URLMaker interface { + GetUrl() string +} + +type Event struct { + Collector_id string + Time Timestamp + EventType string + Data interface{} + Result chan Result `json:"-"` +} + +func (e *Event) GetResult() Result { + return <-e.Result +} + +func NewEvent(dataType string, data interface{}) *Event { + return &Event{ + Collector_id: "", // get this collector identity + Time: Now(), + EventType: dataType, + Data: data, + Result: make(chan Result), + } +} diff --git a/events/event_test.go b/events/event_test.go new file mode 100644 index 0000000..580f912 --- /dev/null +++ b/events/event_test.go @@ -0,0 +1,25 @@ +package events + +import ( + "github.com/stretchr/testify/assert" + "loafle.com/commons/communicate" + "testing" + "time" +) + +func TestCommunicatorGetResult(t *testing.T) { + + const TEST_MESSAGE = "TEST" + e := &Event{Result: make(chan Result)} + communicate.Send(e) + + go func() { + assert.Equal(t, TEST_MESSAGE, e.GetResult().Message) + }() + + time.Sleep(time.Second) + + go func() { + e.Result <- Result{Message: TEST_MESSAGE} + }() +} diff --git a/events/result.go b/events/result.go new file mode 100644 index 0000000..ecda60e --- /dev/null +++ b/events/result.go @@ -0,0 +1,5 @@ +package events + +type Result struct { + Message string +} diff --git a/events/result_test.go b/events/result_test.go new file mode 100644 index 0000000..ef551be --- /dev/null +++ b/events/result_test.go @@ -0,0 +1,14 @@ +package events + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func GetTempResult() Result { + return Result{Message: "Test"} +} + +func TestResult(t *testing.T) { + assert.Equal(t, GetTempResult().Message, "Test") +} diff --git a/events/timestamp.go b/events/timestamp.go new file mode 100644 index 0000000..ccc56a0 --- /dev/null +++ b/events/timestamp.go @@ -0,0 +1,37 @@ +package events + +import ( + "fmt" + "strconv" + "time" +) + +type Timestamp time.Time + +func (t Timestamp) MarshalJSON() ([]byte, error) { + ts := time.Time(t).Unix() + stamp := fmt.Sprint(ts * 1000) + return []byte(stamp), nil +} + +func (t *Timestamp) UnmarshalJSON(b []byte) error { + ts, err := strconv.Atoi(string(b)) + if err != nil { + return err + } + *t = Timestamp(time.Unix(int64(ts)/1000, 0)) + + return nil +} + +func (t Timestamp) String() string { + return time.Time(t).String() +} + +func Now() Timestamp { + return Timestamp(time.Now()) +} + +func Date(year int, month time.Month, day int) Timestamp { + return Timestamp(time.Date(year, month, day, 0, 0, 0, 0, time.UTC)) +}