From 120628a01c6b533afdd4f85b126973fb34d2dfda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Mart=C3=AD?= Date: Mon, 1 Apr 2019 19:31:05 +0100 Subject: [PATCH] fix data race when spawning tabs concurrently This fixes the data race uncovered by the recent refactor to run all tests as tabs under the same browser. The problem was that a write on the pages map could be done from the goroutine calling NewContext to create a new map, while other goroutines could similarly read or write the same map. Instead of adding a lock around the map, make one of the Browser's goroutines be the sole user of the map. To make that extra obvious and avoid potential races in the future, declare the map inside the goroutine's scope. For some reason, this makes the Attributes tests flakier than before. For now, add short sleeps; we can investigate that separately, now that the data races are gone. --- browser.go | 162 ++++++++++++++++++++++++++++++-------------------- query_test.go | 10 ++++ 2 files changed, 108 insertions(+), 64 deletions(-) diff --git a/browser.go b/browser.go index 2d57395..3942f9c 100644 --- a/browser.go +++ b/browser.go @@ -9,7 +9,6 @@ package chromedp import ( "context" "encoding/json" - "fmt" "log" "sync/atomic" @@ -26,17 +25,19 @@ import ( type Browser struct { userDataDir string - pages map[target.SessionID]*Target - conn Transport // next is the next message id. next int64 - cmdQueue chan cmdJob + // tabQueue is the queue used to create new target handlers, once a new + // tab is created and attached to. The newly created Target is sent back + // via tabResult. + tabQueue chan target.SessionID + tabResult chan *Target - // qres is the incoming command result queue. - qres chan *cdproto.Message + // cmdQueue is the outgoing command queue. + cmdQueue chan cmdJob // logging funcs logf func(string, ...interface{}) @@ -58,10 +59,10 @@ func NewBrowser(ctx context.Context, urlstr string, opts ...BrowserOption) (*Bro b := &Browser{ conn: conn, - pages: make(map[target.SessionID]*Target, 1024), + tabQueue: make(chan target.SessionID, 1), + tabResult: make(chan *Target, 1), cmdQueue: make(chan cmdJob), - qres: make(chan *cdproto.Message), logf: log.Printf, } @@ -107,23 +108,8 @@ func (b *Browser) newExecutorForTarget(ctx context.Context, sessionID target.Ses if sessionID == "" { panic("empty session ID") } - if _, ok := b.pages[sessionID]; ok { - panic(fmt.Sprintf("executor for %q already exists", sessionID)) - } - t := &Target{ - browser: b, - SessionID: sessionID, - - eventQueue: make(chan *cdproto.Message, 1024), - waitQueue: make(chan func(cur *cdp.Frame) bool, 1024), - frames: make(map[cdp.FrameID]*cdp.Frame), - - logf: b.logf, - errf: b.errf, - } - go t.run(ctx) - b.pages[sessionID] = t - return t + b.tabQueue <- sessionID + return <-b.tabResult } func (b *Browser) Execute(ctx context.Context, method string, params json.Marshaler, res json.Unmarshaler) error { @@ -161,6 +147,11 @@ func (b *Browser) Execute(ctx context.Context, method string, params json.Marsha return nil } +type tabEvent struct { + sessionID target.SessionID + msg *cdproto.Message +} + func (b *Browser) run(ctx context.Context) { defer b.conn.Close() @@ -168,77 +159,120 @@ func (b *Browser) run(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() + // tabEventQueue is the queue of incoming target events, to be routed by + // their session ID. + tabEventQueue := make(chan tabEvent, 1) + + // resQueue is the incoming command result queue. + resQueue := make(chan *cdproto.Message, 1) + + // This goroutine continuously reads events from the websocket + // connection. The separate goroutine is needed since a websocket read + // is blocking, so it cannot be used in a select statement. go func() { defer cancel() - for { - select { - case <-ctx.Done(): - return - default: - // continue below - } msg, err := b.conn.Read() if err != nil { return } - var sessionID target.SessionID - if msg.Method == cdproto.EventTargetReceivedMessageFromTarget { - recv := new(target.EventReceivedMessageFromTarget) - if err := json.Unmarshal(msg.Params, recv); err != nil { + if msg.Method == cdproto.EventRuntimeExceptionThrown { + ev := new(runtime.EventExceptionThrown) + if err := json.Unmarshal(msg.Params, ev); err != nil { b.errf("%s", err) continue } - sessionID = recv.SessionID + b.errf("%+v\n", ev.ExceptionDetails) + continue + } + + var sessionID target.SessionID + if msg.Method == cdproto.EventTargetReceivedMessageFromTarget { + event := new(target.EventReceivedMessageFromTarget) + if err := json.Unmarshal(msg.Params, event); err != nil { + b.errf("%s", err) + continue + } + sessionID = event.SessionID msg = new(cdproto.Message) - if err := json.Unmarshal([]byte(recv.Message), msg); err != nil { + if err := json.Unmarshal([]byte(event.Message), msg); err != nil { b.errf("%s", err) continue } } - switch { case msg.Method != "": if sessionID == "" { - // TODO: are we interested in - // these events? + // TODO: are we interested in browser events? continue } - if msg.Method == cdproto.EventRuntimeExceptionThrown { - ev := new(runtime.EventExceptionThrown) - if err := json.Unmarshal(msg.Params, ev); err != nil { - b.errf("%s", err) - continue - } - b.errf("%+v\n", ev.ExceptionDetails) + tabEventQueue <- tabEvent{ + sessionID: sessionID, + msg: msg, } - - page, ok := b.pages[sessionID] - if !ok { - b.errf("unknown session ID %q", sessionID) - continue - } - select { - case page.eventQueue <- msg: - default: - panic("eventQueue is full") - } - case msg.ID != 0: - b.qres <- msg - + // We can't process the response here, as it's + // another goroutine that maintans respByID. + resQueue <- msg default: b.errf("ignoring malformed incoming message (missing id or method): %#v", msg) } } }() + // This goroutine handles tabs, as well as routing events to each tab + // via the pages map. + go func() { + defer cancel() + + // This map is only safe for use within this goroutine, so don't + // declare it as a Browser field. + pages := make(map[target.SessionID]*Target, 1024) + for { + select { + case sessionID := <-b.tabQueue: + if _, ok := pages[sessionID]; ok { + b.errf("executor for %q already exists", sessionID) + } + t := &Target{ + browser: b, + SessionID: sessionID, + + eventQueue: make(chan *cdproto.Message, 1024), + waitQueue: make(chan func(cur *cdp.Frame) bool, 1024), + frames: make(map[cdp.FrameID]*cdp.Frame), + + logf: b.logf, + errf: b.errf, + } + go t.run(ctx) + pages[sessionID] = t + b.tabResult <- t + case event := <-tabEventQueue: + page, ok := pages[event.sessionID] + if !ok { + b.errf("unknown session ID %q", event.sessionID) + continue + } + select { + case page.eventQueue <- event.msg: + default: + panic("eventQueue is full") + } + + case <-ctx.Done(): + return + } + } + }() + respByID := make(map[int64]chan *cdproto.Message) - // process queues + // This goroutine handles sending commands to the browser, and sending + // responses back for each of these commands via respByID. for { select { - case res := <-b.qres: + case res := <-resQueue: resp, ok := respByID[res.ID] if !ok { b.errf("id %d not present in response map", res.ID) diff --git a/query_test.go b/query_test.go index 500bf4e..4a1787d 100644 --- a/query_test.go +++ b/query_test.go @@ -11,6 +11,7 @@ import ( "os" "reflect" "testing" + "time" "github.com/chromedp/cdproto/cdp" "github.com/chromedp/cdproto/css" @@ -510,6 +511,9 @@ func TestSetAttributes(t *testing.T) { t.Fatalf("got error: %v", err) } + // TODO: figure why this test is flaky without this + time.Sleep(10 * time.Millisecond) + var attrs map[string]string if err := Run(ctx, Attributes(test.sel, &attrs, test.by)); err != nil { t.Fatalf("got error: %v", err) @@ -585,6 +589,9 @@ func TestSetAttributeValue(t *testing.T) { t.Fatalf("got error: %v", err) } + // TODO: figure why this test is flaky without this + time.Sleep(10 * time.Millisecond) + var value string var ok bool if err := Run(ctx, AttributeValue(test.sel, test.attr, &value, &ok, test.by)); err != nil { @@ -629,6 +636,9 @@ func TestRemoveAttribute(t *testing.T) { t.Fatalf("got error: %v", err) } + // TODO: figure why this test is flaky without this + time.Sleep(10 * time.Millisecond) + var value string var ok bool if err := Run(ctx, AttributeValue(test.sel, test.attr, &value, &ok, test.by)); err != nil {