chromedp/handler.go
2017-01-24 22:09:23 +07:00

660 lines
13 KiB
Go

package chromedp
import (
"context"
"fmt"
"log"
"reflect"
"runtime"
"strings"
"sync"
"time"
"github.com/mailru/easyjson"
. "github.com/knq/chromedp/cdp"
"github.com/knq/chromedp/cdp/css"
"github.com/knq/chromedp/cdp/dom"
"github.com/knq/chromedp/cdp/inspector"
logdom "github.com/knq/chromedp/cdp/log"
"github.com/knq/chromedp/cdp/page"
rundom "github.com/knq/chromedp/cdp/runtime"
"github.com/knq/chromedp/client"
)
var (
_ FrameHandler = &TargetHandler{}
)
// TargetHandler manages a Chrome Debugging Protocol target.
type TargetHandler struct {
conn client.Transport
// frames is the set of encountered frames.
frames map[FrameID]*Frame
// cur is the current top level frame.
cur *Frame
// qcmd is the outgoing message queue.
qcmd chan *Message
// qres is the incoming command result queue.
qres chan *Message
// qevents is the incoming event queue.
qevents chan *Message
// detached is closed when the detached event is received.
detached chan *inspector.EventDetached
pageWaitGroup, domWaitGroup *sync.WaitGroup
// last is the last sent message identifier.
last int64
lastm sync.Mutex
// res is the id->result channel map.
res map[int64]chan interface{}
resrw sync.RWMutex
sync.RWMutex
}
// NewTargetHandler creates a new manager for the specified client target.
func NewTargetHandler(t client.Target) (*TargetHandler, error) {
conn, err := client.Dial(t)
if err != nil {
return nil, err
}
return &TargetHandler{conn: conn}, nil
}
// Run starts the processing of commands and events to the client target
// provided to NewTargetHandler.
//
// Callers can stop Run by closing the passed context.
func (h *TargetHandler) Run(ctxt context.Context) error {
var err error
// reset
h.Lock()
h.frames = make(map[FrameID]*Frame)
h.qcmd = make(chan *Message)
h.qres = make(chan *Message)
h.qevents = make(chan *Message)
h.res = make(map[int64]chan interface{})
h.detached = make(chan *inspector.EventDetached)
h.pageWaitGroup = new(sync.WaitGroup)
h.domWaitGroup = new(sync.WaitGroup)
h.Unlock()
// run
go h.run(ctxt)
// enable domains
for _, a := range []Action{
logdom.Enable(),
rundom.Enable(),
//network.Enable(),
inspector.Enable(),
page.Enable(),
dom.Enable(),
css.Enable(),
} {
err = a.Do(ctxt, h)
if err != nil {
return fmt.Errorf("unable to execute %s, got: %v", reflect.TypeOf(a), err)
}
}
h.Lock()
// get page resources
tree, err := page.GetResourceTree().Do(ctxt, h)
if err != nil {
return fmt.Errorf("unable to get resource tree, got: %v", err)
}
h.frames[tree.Frame.ID] = tree.Frame
h.cur = tree.Frame
for _, c := range tree.ChildFrames {
h.frames[c.Frame.ID] = c.Frame
}
h.Unlock()
h.documentUpdated(ctxt)
return nil
}
// run handles the actual message processing to / from the web socket connection.
func (h *TargetHandler) run(ctxt context.Context) {
defer h.conn.Close()
// add cancel to context
ctxt, cancel := context.WithCancel(ctxt)
defer cancel()
go func() {
defer cancel()
for {
select {
default:
msg, err := h.read()
if err != nil {
return
}
switch {
case msg.Method != "":
h.qevents <- msg
case msg.ID != 0:
h.qres <- msg
default:
log.Printf("ignoring malformed incoming message (missing id or method): %#v", msg)
}
case <-h.detached:
// FIXME: should log when detached, and reason
return
case <-ctxt.Done():
return
}
}
}()
var err error
// process queues
for {
select {
case ev := <-h.qevents:
err = h.processEvent(ctxt, ev)
if err != nil {
log.Printf("could not process event, got: %v", err)
}
case res := <-h.qres:
err = h.processResult(res)
if err != nil {
log.Printf("could not process command result, got: %v", err)
}
case cmd := <-h.qcmd:
err = h.processCommand(cmd)
if err != nil {
log.Printf("could not process command, got: %v", err)
}
case <-ctxt.Done():
return
}
}
}
// read reads a message from the client connection.
func (h *TargetHandler) read() (*Message, error) {
// read
buf, err := h.conn.Read()
if err != nil {
return nil, err
}
log.Printf("-> %s", string(buf))
// unmarshal
msg := new(Message)
err = easyjson.Unmarshal(buf, msg)
if err != nil {
return nil, err
}
return msg, nil
}
// processEvent processes an incoming event.
func (h *TargetHandler) processEvent(ctxt context.Context, msg *Message) error {
if msg == nil {
return ErrChannelClosed
}
// unmarshal
ev, err := UnmarshalMessage(msg)
if err != nil {
return err
}
switch e := ev.(type) {
case *inspector.EventDetached:
h.Lock()
defer h.Unlock()
h.detached <- e
return nil
case *dom.EventDocumentUpdated:
h.domWaitGroup.Wait()
go h.documentUpdated(ctxt)
return nil
}
d := msg.Method.Domain()
if d != "Page" && d != "DOM" {
return nil
}
switch d {
case "Page":
h.pageWaitGroup.Add(1)
go h.pageEvent(ctxt, ev)
case "DOM":
h.domWaitGroup.Add(1)
go h.domEvent(ctxt, ev)
}
return nil
}
// documentUpdated handles the document updated event, retrieving the document
// root for the root frame.
func (h *TargetHandler) documentUpdated(ctxt context.Context) {
f, err := h.WaitFrame(ctxt, emptyFrameID)
if err != nil {
log.Printf("could not get current frame, got: %v", err)
return
}
f.Lock()
defer f.Unlock()
// invalidate nodes
if f.Root != nil {
close(f.Root.Invalidated)
}
f.Nodes = make(map[NodeID]*Node)
f.Root, err = dom.GetDocument().WithPierce(true).Do(ctxt, h)
if err != nil {
log.Printf("error could not retrieve document root for %s, got: %v", f.ID, err)
return
}
f.Root.Invalidated = make(chan struct{})
walk(f.Nodes, f.Root)
}
// processResult processes an incoming command result.
func (h *TargetHandler) processResult(msg *Message) error {
h.resrw.Lock()
defer h.resrw.Unlock()
res, ok := h.res[msg.ID]
if !ok {
panic(fmt.Sprintf("expected result to be present for message id %d", msg.ID))
}
if msg.Error != nil {
res <- msg.Error
} else {
res <- msg.Result
}
delete(h.res, msg.ID)
return nil
}
// processCommand writes a command to the client connection.
func (h *TargetHandler) processCommand(cmd *Message) error {
// FIXME: there are two possible error conditions here, check and
// do some kind of logging ...
buf, err := easyjson.Marshal(cmd)
if err != nil {
return err
}
log.Printf("<- %s", string(buf))
// write
return h.conn.Write(buf)
}
// Execute executes commandType against the endpoint passed to Run, using the
// provided context and the raw JSON encoded params.
//
// Returns a result channel that will receive AT MOST ONE result. A result is
// either the command's result value (as a raw JSON encoded value), or any
// error encountered during operation. After the result (or an error) is passed
// to the returned channel, the channel will be closed.
//
// Note: the returned channel will be closed after the result is read. If the
// passed context finishes prior to receiving the command result, then
// ErrContextDone will be sent to the channel.
func (h *TargetHandler) Execute(ctxt context.Context, commandType MethodType, params easyjson.RawMessage) <-chan interface{} {
ch := make(chan interface{}, 1)
go func() {
defer close(ch)
res := make(chan interface{}, 1)
defer close(res)
// get next id
h.lastm.Lock()
h.last++
id := h.last
h.lastm.Unlock()
// save channel
h.resrw.Lock()
h.res[id] = res
h.resrw.Unlock()
h.qcmd <- &Message{
ID: id,
Method: commandType,
Params: params,
}
select {
case v := <-res:
if v != nil {
ch <- v
} else {
ch <- ErrChannelClosed
}
case <-ctxt.Done():
ch <- ErrContextDone
}
}()
return ch
}
// Listen adds a listener for the specified event types to the appropriate
// domain.
func (h *TargetHandler) Listen(eventTypes ...MethodType) <-chan interface{} {
return nil
}
// GetRoot returns the current top level frame's root document node.
func (h *TargetHandler) GetRoot(ctxt context.Context) (*Node, error) {
// TODO: fix this
ctxt, cancel := context.WithTimeout(ctxt, 10*time.Second)
defer cancel()
var root *Node
loop:
for {
var cur *Frame
select {
default:
h.RLock()
cur = h.cur
if cur != nil {
cur.RLock()
root = cur.Root
cur.RUnlock()
}
h.RUnlock()
if cur != nil && root != nil {
break loop
}
time.Sleep(DefaultCheckDuration)
case <-ctxt.Done():
return nil, ctxt.Err()
}
}
return root, nil
}
// SetActive sets the currently active frame after a successful navigation.
func (h *TargetHandler) SetActive(ctxt context.Context, id FrameID) error {
var err error
// get frame
f, err := h.WaitFrame(ctxt, id)
if err != nil {
return err
}
h.Lock()
defer h.Unlock()
h.cur = f
return nil
}
// WaitFrame waits for a frame to be loaded using the provided context.
func (h *TargetHandler) WaitFrame(ctxt context.Context, id FrameID) (*Frame, error) {
// TODO: fix this
timeout := time.After(10 * time.Second)
loop:
for {
select {
default:
var f *Frame
var ok bool
h.RLock()
if id == emptyFrameID {
f, ok = h.cur, h.cur != nil
} else {
f, ok = h.frames[id]
}
h.RUnlock()
if ok {
return f, nil
}
time.Sleep(DefaultCheckDuration)
case <-ctxt.Done():
return nil, ErrContextDone
case <-timeout:
break loop
}
}
return nil, fmt.Errorf("timeout waiting for frame `%s`", id)
}
// WaitNode waits for a node to be loaded using the provided context.
func (h *TargetHandler) WaitNode(ctxt context.Context, f *Frame, id NodeID) (*Node, error) {
// TODO: fix this
timeout := time.After(10 * time.Second)
loop:
for {
select {
default:
var n *Node
var ok bool
f.RLock()
n, ok = f.Nodes[id]
f.RUnlock()
if n != nil && ok {
return n, nil
}
time.Sleep(DefaultCheckDuration)
case <-ctxt.Done():
return nil, ErrContextDone
case <-timeout:
break loop
}
}
return nil, fmt.Errorf("timeout waiting for node `%d`", id)
}
// pageEvent handles incoming page events.
func (h *TargetHandler) pageEvent(ctxt context.Context, ev interface{}) {
defer h.pageWaitGroup.Done()
var id FrameID
var op FrameOp
switch e := ev.(type) {
case *page.EventFrameNavigated:
h.Lock()
h.frames[e.Frame.ID] = e.Frame
h.Unlock()
return
case *page.EventFrameAttached:
id, op = e.FrameID, frameAttached(e.ParentFrameID)
case *page.EventFrameDetached:
id, op = e.FrameID, frameDetached
case *page.EventFrameStartedLoading:
id, op = e.FrameID, frameStartedLoading
case *page.EventFrameStoppedLoading:
id, op = e.FrameID, frameStoppedLoading
case *page.EventFrameScheduledNavigation:
id, op = e.FrameID, frameScheduledNavigation
case *page.EventFrameClearedScheduledNavigation:
id, op = e.FrameID, frameClearedScheduledNavigation
case *page.EventDomContentEventFired:
return
case *page.EventLoadEventFired:
return
case *page.EventFrameResized:
return
default:
panic(fmt.Sprintf("unhandled page event %s", reflect.TypeOf(ev)))
}
f, err := h.WaitFrame(ctxt, id)
if err != nil {
log.Printf("error could not get frame %s, got: %v", id, err)
return
}
h.Lock()
defer h.Unlock()
f.Lock()
defer f.Unlock()
op(f)
}
// domEvent handles incoming DOM events.
func (h *TargetHandler) domEvent(ctxt context.Context, ev interface{}) {
defer h.domWaitGroup.Done()
// wait current frame
f, err := h.WaitFrame(ctxt, emptyFrameID)
if err != nil {
log.Printf("error processing DOM event %s: error waiting for frame, got: %v", reflect.TypeOf(ev), err)
return
}
var id NodeID
var op NodeOp
switch e := ev.(type) {
case *dom.EventSetChildNodes:
id, op = e.ParentID, setChildNodes(f.Nodes, e.Nodes)
case *dom.EventAttributeModified:
id, op = e.NodeID, attributeModified(e.Name, e.Value)
case *dom.EventAttributeRemoved:
id, op = e.NodeID, attributeRemoved(e.Name)
case *dom.EventInlineStyleInvalidated:
id, op = e.NodeIds[0], inlineStyleInvalidated(e.NodeIds[1:])
case *dom.EventCharacterDataModified:
id, op = e.NodeID, characterDataModified(e.CharacterData)
case *dom.EventChildNodeCountUpdated:
id, op = e.NodeID, childNodeCountUpdated(e.ChildNodeCount)
case *dom.EventChildNodeInserted:
if e.PreviousNodeID != emptyNodeID {
_, err = h.WaitNode(ctxt, f, e.PreviousNodeID)
if err != nil {
return
}
}
id, op = e.ParentNodeID, childNodeInserted(f.Nodes, e.PreviousNodeID, e.Node)
case *dom.EventChildNodeRemoved:
id, op = e.ParentNodeID, childNodeRemoved(f.Nodes, e.NodeID)
case *dom.EventShadowRootPushed:
id, op = e.HostID, shadowRootPushed(f.Nodes, e.Root)
case *dom.EventShadowRootPopped:
id, op = e.HostID, shadowRootPopped(f.Nodes, e.RootID)
case *dom.EventPseudoElementAdded:
id, op = e.ParentID, pseudoElementAdded(f.Nodes, e.PseudoElement)
case *dom.EventPseudoElementRemoved:
id, op = e.ParentID, pseudoElementRemoved(f.Nodes, e.PseudoElementID)
case *dom.EventDistributedNodesUpdated:
id, op = e.InsertionPointID, distributedNodesUpdated(e.DistributedNodes)
case *dom.EventNodeHighlightRequested:
id, op = e.NodeID, nodeHighlightRequested
case *dom.EventInspectNodeRequested:
return
default:
panic(fmt.Sprintf("unhandled node event %s", reflect.TypeOf(ev)))
}
s := strings.TrimPrefix(strings.TrimSuffix(runtime.FuncForPC(reflect.ValueOf(op).Pointer()).Name(), ".func1"), "github.com/knq/chromedp.")
// retrieve node
n, err := h.WaitNode(ctxt, f, id)
if err != nil {
log.Printf("error could not perform (%s) operation on node %d (wait node error), got: %v", s, id, err)
return
}
h.Lock()
defer h.Unlock()
f.Lock()
defer f.Unlock()
n.Lock()
defer n.Unlock()
op(n)
}