fix data race when spawning tabs concurrently

This fixes the data race uncovered by the recent refactor to run all
tests as tabs under the same browser.

The problem was that a write on the pages map could be done from the
goroutine calling NewContext to create a new map, while other goroutines
could similarly read or write the same map.

Instead of adding a lock around the map, make one of the Browser's
goroutines be the sole user of the map. To make that extra obvious and
avoid potential races in the future, declare the map inside the
goroutine's scope.

For some reason, this makes the Attributes tests flakier than before.
For now, add short sleeps; we can investigate that separately, now that
the data races are gone.
This commit is contained in:
Daniel Martí 2019-04-01 19:31:05 +01:00
parent 7c8529b914
commit 120628a01c
2 changed files with 108 additions and 64 deletions

View File

@ -9,7 +9,6 @@ package chromedp
import (
"context"
"encoding/json"
"fmt"
"log"
"sync/atomic"
@ -26,17 +25,19 @@ import (
type Browser struct {
userDataDir string
pages map[target.SessionID]*Target
conn Transport
// next is the next message id.
next int64
cmdQueue chan cmdJob
// tabQueue is the queue used to create new target handlers, once a new
// tab is created and attached to. The newly created Target is sent back
// via tabResult.
tabQueue chan target.SessionID
tabResult chan *Target
// qres is the incoming command result queue.
qres chan *cdproto.Message
// cmdQueue is the outgoing command queue.
cmdQueue chan cmdJob
// logging funcs
logf func(string, ...interface{})
@ -58,10 +59,10 @@ func NewBrowser(ctx context.Context, urlstr string, opts ...BrowserOption) (*Bro
b := &Browser{
conn: conn,
pages: make(map[target.SessionID]*Target, 1024),
tabQueue: make(chan target.SessionID, 1),
tabResult: make(chan *Target, 1),
cmdQueue: make(chan cmdJob),
qres: make(chan *cdproto.Message),
logf: log.Printf,
}
@ -107,23 +108,8 @@ func (b *Browser) newExecutorForTarget(ctx context.Context, sessionID target.Ses
if sessionID == "" {
panic("empty session ID")
}
if _, ok := b.pages[sessionID]; ok {
panic(fmt.Sprintf("executor for %q already exists", sessionID))
}
t := &Target{
browser: b,
SessionID: sessionID,
eventQueue: make(chan *cdproto.Message, 1024),
waitQueue: make(chan func(cur *cdp.Frame) bool, 1024),
frames: make(map[cdp.FrameID]*cdp.Frame),
logf: b.logf,
errf: b.errf,
}
go t.run(ctx)
b.pages[sessionID] = t
return t
b.tabQueue <- sessionID
return <-b.tabResult
}
func (b *Browser) Execute(ctx context.Context, method string, params json.Marshaler, res json.Unmarshaler) error {
@ -161,6 +147,11 @@ func (b *Browser) Execute(ctx context.Context, method string, params json.Marsha
return nil
}
type tabEvent struct {
sessionID target.SessionID
msg *cdproto.Message
}
func (b *Browser) run(ctx context.Context) {
defer b.conn.Close()
@ -168,77 +159,120 @@ func (b *Browser) run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// tabEventQueue is the queue of incoming target events, to be routed by
// their session ID.
tabEventQueue := make(chan tabEvent, 1)
// resQueue is the incoming command result queue.
resQueue := make(chan *cdproto.Message, 1)
// This goroutine continuously reads events from the websocket
// connection. The separate goroutine is needed since a websocket read
// is blocking, so it cannot be used in a select statement.
go func() {
defer cancel()
for {
select {
case <-ctx.Done():
return
default:
// continue below
}
msg, err := b.conn.Read()
if err != nil {
return
}
var sessionID target.SessionID
if msg.Method == cdproto.EventTargetReceivedMessageFromTarget {
recv := new(target.EventReceivedMessageFromTarget)
if err := json.Unmarshal(msg.Params, recv); err != nil {
if msg.Method == cdproto.EventRuntimeExceptionThrown {
ev := new(runtime.EventExceptionThrown)
if err := json.Unmarshal(msg.Params, ev); err != nil {
b.errf("%s", err)
continue
}
sessionID = recv.SessionID
b.errf("%+v\n", ev.ExceptionDetails)
continue
}
var sessionID target.SessionID
if msg.Method == cdproto.EventTargetReceivedMessageFromTarget {
event := new(target.EventReceivedMessageFromTarget)
if err := json.Unmarshal(msg.Params, event); err != nil {
b.errf("%s", err)
continue
}
sessionID = event.SessionID
msg = new(cdproto.Message)
if err := json.Unmarshal([]byte(recv.Message), msg); err != nil {
if err := json.Unmarshal([]byte(event.Message), msg); err != nil {
b.errf("%s", err)
continue
}
}
switch {
case msg.Method != "":
if sessionID == "" {
// TODO: are we interested in
// these events?
// TODO: are we interested in browser events?
continue
}
if msg.Method == cdproto.EventRuntimeExceptionThrown {
ev := new(runtime.EventExceptionThrown)
if err := json.Unmarshal(msg.Params, ev); err != nil {
b.errf("%s", err)
continue
}
b.errf("%+v\n", ev.ExceptionDetails)
tabEventQueue <- tabEvent{
sessionID: sessionID,
msg: msg,
}
page, ok := b.pages[sessionID]
if !ok {
b.errf("unknown session ID %q", sessionID)
continue
}
select {
case page.eventQueue <- msg:
default:
panic("eventQueue is full")
}
case msg.ID != 0:
b.qres <- msg
// We can't process the response here, as it's
// another goroutine that maintans respByID.
resQueue <- msg
default:
b.errf("ignoring malformed incoming message (missing id or method): %#v", msg)
}
}
}()
// This goroutine handles tabs, as well as routing events to each tab
// via the pages map.
go func() {
defer cancel()
// This map is only safe for use within this goroutine, so don't
// declare it as a Browser field.
pages := make(map[target.SessionID]*Target, 1024)
for {
select {
case sessionID := <-b.tabQueue:
if _, ok := pages[sessionID]; ok {
b.errf("executor for %q already exists", sessionID)
}
t := &Target{
browser: b,
SessionID: sessionID,
eventQueue: make(chan *cdproto.Message, 1024),
waitQueue: make(chan func(cur *cdp.Frame) bool, 1024),
frames: make(map[cdp.FrameID]*cdp.Frame),
logf: b.logf,
errf: b.errf,
}
go t.run(ctx)
pages[sessionID] = t
b.tabResult <- t
case event := <-tabEventQueue:
page, ok := pages[event.sessionID]
if !ok {
b.errf("unknown session ID %q", event.sessionID)
continue
}
select {
case page.eventQueue <- event.msg:
default:
panic("eventQueue is full")
}
case <-ctx.Done():
return
}
}
}()
respByID := make(map[int64]chan *cdproto.Message)
// process queues
// This goroutine handles sending commands to the browser, and sending
// responses back for each of these commands via respByID.
for {
select {
case res := <-b.qres:
case res := <-resQueue:
resp, ok := respByID[res.ID]
if !ok {
b.errf("id %d not present in response map", res.ID)

View File

@ -11,6 +11,7 @@ import (
"os"
"reflect"
"testing"
"time"
"github.com/chromedp/cdproto/cdp"
"github.com/chromedp/cdproto/css"
@ -510,6 +511,9 @@ func TestSetAttributes(t *testing.T) {
t.Fatalf("got error: %v", err)
}
// TODO: figure why this test is flaky without this
time.Sleep(10 * time.Millisecond)
var attrs map[string]string
if err := Run(ctx, Attributes(test.sel, &attrs, test.by)); err != nil {
t.Fatalf("got error: %v", err)
@ -585,6 +589,9 @@ func TestSetAttributeValue(t *testing.T) {
t.Fatalf("got error: %v", err)
}
// TODO: figure why this test is flaky without this
time.Sleep(10 * time.Millisecond)
var value string
var ok bool
if err := Run(ctx, AttributeValue(test.sel, test.attr, &value, &ok, test.by)); err != nil {
@ -629,6 +636,9 @@ func TestRemoveAttribute(t *testing.T) {
t.Fatalf("got error: %v", err)
}
// TODO: figure why this test is flaky without this
time.Sleep(10 * time.Millisecond)
var value string
var ok bool
if err := Run(ctx, AttributeValue(test.sel, test.attr, &value, &ok, test.by)); err != nil {