diff --git a/browser.go b/browser.go index 2d57395..3942f9c 100644 --- a/browser.go +++ b/browser.go @@ -9,7 +9,6 @@ package chromedp import ( "context" "encoding/json" - "fmt" "log" "sync/atomic" @@ -26,17 +25,19 @@ import ( type Browser struct { userDataDir string - pages map[target.SessionID]*Target - conn Transport // next is the next message id. next int64 - cmdQueue chan cmdJob + // tabQueue is the queue used to create new target handlers, once a new + // tab is created and attached to. The newly created Target is sent back + // via tabResult. + tabQueue chan target.SessionID + tabResult chan *Target - // qres is the incoming command result queue. - qres chan *cdproto.Message + // cmdQueue is the outgoing command queue. + cmdQueue chan cmdJob // logging funcs logf func(string, ...interface{}) @@ -58,10 +59,10 @@ func NewBrowser(ctx context.Context, urlstr string, opts ...BrowserOption) (*Bro b := &Browser{ conn: conn, - pages: make(map[target.SessionID]*Target, 1024), + tabQueue: make(chan target.SessionID, 1), + tabResult: make(chan *Target, 1), cmdQueue: make(chan cmdJob), - qres: make(chan *cdproto.Message), logf: log.Printf, } @@ -107,23 +108,8 @@ func (b *Browser) newExecutorForTarget(ctx context.Context, sessionID target.Ses if sessionID == "" { panic("empty session ID") } - if _, ok := b.pages[sessionID]; ok { - panic(fmt.Sprintf("executor for %q already exists", sessionID)) - } - t := &Target{ - browser: b, - SessionID: sessionID, - - eventQueue: make(chan *cdproto.Message, 1024), - waitQueue: make(chan func(cur *cdp.Frame) bool, 1024), - frames: make(map[cdp.FrameID]*cdp.Frame), - - logf: b.logf, - errf: b.errf, - } - go t.run(ctx) - b.pages[sessionID] = t - return t + b.tabQueue <- sessionID + return <-b.tabResult } func (b *Browser) Execute(ctx context.Context, method string, params json.Marshaler, res json.Unmarshaler) error { @@ -161,6 +147,11 @@ func (b *Browser) Execute(ctx context.Context, method string, params json.Marsha return nil } +type tabEvent struct { + sessionID target.SessionID + msg *cdproto.Message +} + func (b *Browser) run(ctx context.Context) { defer b.conn.Close() @@ -168,77 +159,120 @@ func (b *Browser) run(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() + // tabEventQueue is the queue of incoming target events, to be routed by + // their session ID. + tabEventQueue := make(chan tabEvent, 1) + + // resQueue is the incoming command result queue. + resQueue := make(chan *cdproto.Message, 1) + + // This goroutine continuously reads events from the websocket + // connection. The separate goroutine is needed since a websocket read + // is blocking, so it cannot be used in a select statement. go func() { defer cancel() - for { - select { - case <-ctx.Done(): - return - default: - // continue below - } msg, err := b.conn.Read() if err != nil { return } - var sessionID target.SessionID - if msg.Method == cdproto.EventTargetReceivedMessageFromTarget { - recv := new(target.EventReceivedMessageFromTarget) - if err := json.Unmarshal(msg.Params, recv); err != nil { + if msg.Method == cdproto.EventRuntimeExceptionThrown { + ev := new(runtime.EventExceptionThrown) + if err := json.Unmarshal(msg.Params, ev); err != nil { b.errf("%s", err) continue } - sessionID = recv.SessionID + b.errf("%+v\n", ev.ExceptionDetails) + continue + } + + var sessionID target.SessionID + if msg.Method == cdproto.EventTargetReceivedMessageFromTarget { + event := new(target.EventReceivedMessageFromTarget) + if err := json.Unmarshal(msg.Params, event); err != nil { + b.errf("%s", err) + continue + } + sessionID = event.SessionID msg = new(cdproto.Message) - if err := json.Unmarshal([]byte(recv.Message), msg); err != nil { + if err := json.Unmarshal([]byte(event.Message), msg); err != nil { b.errf("%s", err) continue } } - switch { case msg.Method != "": if sessionID == "" { - // TODO: are we interested in - // these events? + // TODO: are we interested in browser events? continue } - if msg.Method == cdproto.EventRuntimeExceptionThrown { - ev := new(runtime.EventExceptionThrown) - if err := json.Unmarshal(msg.Params, ev); err != nil { - b.errf("%s", err) - continue - } - b.errf("%+v\n", ev.ExceptionDetails) + tabEventQueue <- tabEvent{ + sessionID: sessionID, + msg: msg, } - - page, ok := b.pages[sessionID] - if !ok { - b.errf("unknown session ID %q", sessionID) - continue - } - select { - case page.eventQueue <- msg: - default: - panic("eventQueue is full") - } - case msg.ID != 0: - b.qres <- msg - + // We can't process the response here, as it's + // another goroutine that maintans respByID. + resQueue <- msg default: b.errf("ignoring malformed incoming message (missing id or method): %#v", msg) } } }() + // This goroutine handles tabs, as well as routing events to each tab + // via the pages map. + go func() { + defer cancel() + + // This map is only safe for use within this goroutine, so don't + // declare it as a Browser field. + pages := make(map[target.SessionID]*Target, 1024) + for { + select { + case sessionID := <-b.tabQueue: + if _, ok := pages[sessionID]; ok { + b.errf("executor for %q already exists", sessionID) + } + t := &Target{ + browser: b, + SessionID: sessionID, + + eventQueue: make(chan *cdproto.Message, 1024), + waitQueue: make(chan func(cur *cdp.Frame) bool, 1024), + frames: make(map[cdp.FrameID]*cdp.Frame), + + logf: b.logf, + errf: b.errf, + } + go t.run(ctx) + pages[sessionID] = t + b.tabResult <- t + case event := <-tabEventQueue: + page, ok := pages[event.sessionID] + if !ok { + b.errf("unknown session ID %q", event.sessionID) + continue + } + select { + case page.eventQueue <- event.msg: + default: + panic("eventQueue is full") + } + + case <-ctx.Done(): + return + } + } + }() + respByID := make(map[int64]chan *cdproto.Message) - // process queues + // This goroutine handles sending commands to the browser, and sending + // responses back for each of these commands via respByID. for { select { - case res := <-b.qres: + case res := <-resQueue: resp, ok := respByID[res.ID] if !ok { b.errf("id %d not present in response map", res.ID) diff --git a/query_test.go b/query_test.go index 500bf4e..4a1787d 100644 --- a/query_test.go +++ b/query_test.go @@ -11,6 +11,7 @@ import ( "os" "reflect" "testing" + "time" "github.com/chromedp/cdproto/cdp" "github.com/chromedp/cdproto/css" @@ -510,6 +511,9 @@ func TestSetAttributes(t *testing.T) { t.Fatalf("got error: %v", err) } + // TODO: figure why this test is flaky without this + time.Sleep(10 * time.Millisecond) + var attrs map[string]string if err := Run(ctx, Attributes(test.sel, &attrs, test.by)); err != nil { t.Fatalf("got error: %v", err) @@ -585,6 +589,9 @@ func TestSetAttributeValue(t *testing.T) { t.Fatalf("got error: %v", err) } + // TODO: figure why this test is flaky without this + time.Sleep(10 * time.Millisecond) + var value string var ok bool if err := Run(ctx, AttributeValue(test.sel, test.attr, &value, &ok, test.by)); err != nil { @@ -629,6 +636,9 @@ func TestRemoveAttribute(t *testing.T) { t.Fatalf("got error: %v", err) } + // TODO: figure why this test is flaky without this + time.Sleep(10 * time.Millisecond) + var value string var ok bool if err := Run(ctx, AttributeValue(test.sel, test.attr, &value, &ok, test.by)); err != nil {