浏览器自动化利器:Playwright Async 完全指南

🎯 学习目标

  • 理解 Playwright 的核心优势和应用场景
  • 掌握异步编程的基本概念
  • 学会使用 Playwright 实现网页自动化
  • 能够构建稳定的爬虫和测试工具

📖 为什么选择 Playwright?

在这里插入图片描述

主流自动化工具对比

特性 Selenium Puppeteer Playwright
多浏览器 ✅ Chrome/Firefox/Safari ❌ 仅 Chromium ✅ Chrome/Firefox/WebKit
异步支持 ⚠️ 需额外配置 ✅ 原生异步 ✅ 原生异步
移动端 ⚠️ 需要真实设备 ✅ 模拟 ✅ 模拟 + 真实
速度 🐌 较慢 🚀 快 🚀 最快
API 设计 🔧 繁琐 ✨ 简洁 ✨ 最优雅
自动等待 ❌ 需手动 ✅ 部分 ✅ 全面

Playwright 核心优势

1. 自动等待(Auto-wait)
from playwright.async_api import async_playwright

async def test_auto_wait():
    async with async_playwright() as p:
        browser = await p.chromium.launch()
        page = await browser.new_page()
        
        # 无需手动等待!Playwright 会自动等待元素可操作
        await page.click('#submit-button')
        await page.fill('#username', 'test')
        
        print("✓ 所有操作自动等待元素就绪")
        
        await browser.close()

传统方式 vs Playwright

# ❌ Selenium 方式
element = driver.find_element(By.ID, "button")
time.sleep(3)  # 硬等待
WebDriverWait(driver, 10).until(
    EC.element_to_be_clickable(element)
)
element.click()

# ✅ Playwright 方式
await page.click("#button")  # 自动等待可点击状态

2. 多浏览器支持
async def test_cross_browser():
    async with async_playwright() as p:
        # 测试 Chrome
        chrome = await p.chromium.launch()
        chrome_page = await chrome.new_page()
        await chrome_page.goto("https://example.com")
        print(f"Chrome: {await chrome_page.title()}")
        
        # 测试 Firefox
        firefox = await p.firefox.launch()
        firefox_page = await firefox.new_page()
        await firefox_page.goto("https://example.com")
        print(f"Firefox: {await firefox_page.title()}")
        
        # 测试 WebKit (Safari)
        webkit = await p.webkit.launch()
        webkit_page = await webkit.new_page()
        await webkit_page.goto("https://example.com")
        print(f"WebKit: {await webkit_page.title()}")

3. 移动设备模拟
from playwright.async_api import devices

async def test_mobile():
    async with async_playwright() as p:
        # 模拟 iPhone 14
        iphone = devices["iPhone 14"]
        browser = await p.chromium.launch()
        context = await browser.new_context(**iphone)
        page = await context.new_page()
        
        await page.goto("https://example.com")
        
        # 检查响应式设计
        is_responsive = await page.evaluate("""() => {
            return window.innerWidth < 768;
        }""")
        
        print(f"✓ 移动端视图:{is_responsive}")
        
        # 模拟 iPad
        ipad = devices["iPad Pro"]
        context = await browser.new_context(**ipad)
        # ... 继续测试

💻 实战:CSDN 自动发布工具

Step 1: 项目结构

csdn-publisher/
├── csdn_publisher.py      # 核心发布逻辑
├── csdn_cover_generator.py # 封面图生成
├── requirements.txt       # 依赖
└── examples/
    └── demo.py           # 使用示例

Step 2: Cookie 登录机制

import json
from pathlib import Path

class CSDNPublisher:
    def __init__(self, headless=True):
        self.headless = headless
        self.browser = None
        self.context = None
        self.page = None
    
    async def initialize(self):
        """初始化浏览器"""
        from playwright.async_api import async_playwright
        
        playwright = await async_playwright().start()
        self.browser = await playwright.chromium.launch(
            headless=self.headless
        )
        
        # 创建上下文(带用户代理)
        self.context = await self.browser.new_context(
            user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
        )
        self.page = await self.context.new_page()
    
    async def login(self, cookie_path="csdn_cookies.json"):
        """
        从文件加载 Cookie 并登录
        
        Args:
            cookie_path: Cookie 文件路径
        """
        if not Path(cookie_path).exists():
            raise FileNotFoundError(f"Cookie 文件不存在:{cookie_path}")
        
        # 加载 Cookie
        with open(cookie_path, 'r', encoding='utf-8') as f:
            cookies = json.load(f)
        
        # 添加到浏览器上下文
        await self.context.add_cookies(cookies)
        
        # 验证登录状态
        await self.page.goto("https://i.csdn.net/")
        await self.page.wait_for_load_state("networkidle")
        
        # 检查是否已登录
        is_logged_in = await self.page.evaluate("""() => {
            return document.cookie.includes('log_Id');
        }""")
        
        if is_logged_in:
            print("✓ Cookie 加载成功,已登录")
        else:
            print("⚠ Cookie 可能已过期")

Step 3: 编辑器定位与内容填充

async def find_markdown_editor(self):
    """
    定位 Markdown 编辑器
    
    CSDN 的编辑器是 contenteditable div,不是 textarea
    """
    # 等待编辑器加载
    try:
        editor_selector = '[class*="editor"]'
        await self.page.wait_for_selector(editor_selector, timeout=10000)
        print(f"✓ 找到编辑器:{editor_selector}")
        return editor_selector
    except:
        print("❌ 未找到编辑器")
        return None
    
async def fill_content(self, content):
    """
    填充 Markdown 内容到编辑器
    
    关键:使用剪贴板 API,而非直接 fill
    """
    # 将内容复制到系统剪贴板
    await self.page.evaluate(f"""() => {{
        navigator.clipboard.writeText(`{content}`);
    }}""")
    
    # 定位编辑器
    editor = await self.page.query_selector('[class*="editor"]')
    
    # 粘贴内容
    await editor.focus()
    await self.page.keyboard.press("Control+V")
    
    # 等待内容渲染
    await asyncio.sleep(2)
    
    # 验证内容
    page_content = await self.page.content()
    if content[:100] in page_content:
        print(f"✓ 内容已粘贴 ({len(content)}字符)")
        return True
    else:
        print("⚠ 内容可能未完全粘贴")
        return False

Step 4: 抗干扰机制

async def click_with_focus_switch(self, selector):
    """
    点击按钮并切换焦点防止页面跳转
    
    CSDN 的按钮会触发新标签页,需要特殊处理
    """
    # 保存当前页面引用
    main_page = self.page
    
    # 定义对话框处理器
    async def handle_dialog(dialog):
        print(f"✓ 已关闭对话框:{dialog.message}")
        await dialog.accept()
    
    self.page.on("dialog", handle_dialog)
    
    # 执行点击(不等待导航)
    await self.page.evaluate(f"""() => {{
        const button = document.querySelector('{selector}');
        if (button) {{
            button.click();
            // 立即移除事件监听器防止跳转
            button.replaceWith(button.cloneNode(true));
        }}
    }}""")
    
    # 短暂等待触发
    await asyncio.sleep(0.5)
    
    # 如果打开了新标签页,切换回来
    pages = self.context.pages
    if len(pages) > 1:
        await pages[-1].close()
        await main_page.bring_to_front()
    
    print(f"✓ 已点击:{selector}")

Step 5: 标签管理

async def add_tags(self, tags):
    """
    添加文章标签
    
    CSDN 限制最多 7 个标签
    """
    if len(tags) > 7:
        print(f"⚠ 标签数量{len(tags)}超过限制,截断为 7 个")
        tags = tags[:7]
    
    for tag in tags:
        # 定位标签输入框
        input_box = await self.page.query_selector(
            'input[placeholder*="搜索"]'
        )
        
        if input_box:
            # 输入标签名
            await input_box.fill(tag)
            await asyncio.sleep(0.3)
            
            # 按 Enter 确认
            await self.page.keyboard.press("Enter")
            await asyncio.sleep(0.5)
            
            print(f"✓ 标签已添加:{tag}")
        else:
            print(f"⚠ 未找到标签输入框")
            break

Step 6: 封面图上传

from playwright.async_api import FileChooser

async def upload_cover_image(self, image_path):
    """
    上传封面图
    
    处理文件选择器弹窗是关键
    """
    # 检查图片是否存在
    if not Path(image_path).exists():
        print(f"❌ 封面图不存在:{image_path}")
        return False
    
    # 点击"从本地上传"按钮
    upload_button = await self.page.query_selector('text=从本地上传')
    
    if not upload_button:
        print("❌ 未找到上传按钮")
        return False
    
    # 设置文件选择器监听
    async with self.page.expect_file_chooser() as fc_info:
        # 强制点击触发文件选择器
        await upload_button.click(force=True)
    
    # 获取文件选择器
    file_chooser = await fc_info.value
    
    # 设置文件
    await file_chooser.set_files(image_path)
    
    print(f"✓ 封面图已上传:{image_path}")
    
    # 点击图片编辑模态框的确认按钮
    confirm_btn = await self.page.query_selector('text=确认上传')
    if confirm_btn:
        await confirm_btn.click()
        print("✓ 已确认上传")
    
    # 等待图片处理完成
    await asyncio.sleep(3)
    
    return True

🔍 调试技巧

技巧 1:截图定位问题

async def debug_snapshot(step_name):
    """保存调试截图"""
    screenshot_path = f"debug_{step_name}.png"
    await self.page.screenshot(path=screenshot_path)
    print(f"✓ 已保存截图:{screenshot_path}")

# 使用
await debug_snapshot("before_publish")

技巧 2:网络请求监控

async def monitor_network():
    """监控网络请求"""
    request_log = []
    
    def log_request(request):
        request_log.append({
            'url': request.url,
            'method': request.method,
            'headers': request.headers
        })
        print(f"📡 请求:{request.method} {request.url}")
    
    page.on("request", log_request)
    
    # 执行操作...
    await page.click("#publish")
    await asyncio.sleep(5)
    
    # 分析日志
    print(f"\n共捕获{len(request_log)}个请求")

技巧 3:Console 日志捕获

async def capture_console():
    """捕获 Console 消息"""
    console_logs = []
    
    def log_console(msg):
        console_logs.append({
            'type': msg.type,
            'text': msg.text
        })
        print(f"🖥️ Console: {msg.type} - {msg.text}")
    
    page.on("console", log_console)
    
    # 执行操作...
    
    # 导出日志
    with open('console_logs.json', 'w') as f:
        json.dump(console_logs, f, indent=2)

⚠️ 常见问题

Q1: 如何防止点击后页面跳转?

A: 使用事件监听器移除evaluate 点击

# 方法 1:移除事件监听器
await page.evaluate("""() => {
    const btn = document.querySelector('#publish');
    const newBtn = btn.cloneNode(true);
    btn.parentNode.replaceChild(newBtn, btn);
    newBtn.click();
}""")

# 方法 2:不等待导航
await page.click('#publish', no_wait_after=True)

Q2: 文件选择器不触发怎么办?

A: 使用expect_file_chooser() 上下文管理器:

# ❌ 错误方式
await upload_button.click()
await asyncio.sleep(1)  # 不可靠

# ✅ 正确方式
async with page.expect_file_chooser() as fc_info:
    await upload_button.click(force=True)

file_chooser = await fc_info.value
await file_chooser.set_files('/path/to/file.png')

Q3: 如何判断发布成功?

A: 多维度判断:

async def check_publish_success():
    """综合判断发布是否成功"""
    
    # 1. 检查 URL
    current_url = page.url
    if "success" in current_url:
        print("✓ URL 包含成功标识")
        return True
    
    # 2. 检查页面文本
    content = await page.content()
    if "发布成功" in content or "审核通过" in content:
        print("✓ 页面包含成功文字")
        return True
    
    # 3. 检查文章 ID
    article_id = await page.evaluate("""() => {
        const urlParams = new URLSearchParams(window.location.search);
        return urlParams.get('articleId');
    }""")
    
    if article_id:
        print(f"✓ 获取到文章 ID: {article_id}")
        return True
    
    return False

🌟 高级技巧

技巧 1:批量发布

async def batch_publish(articles):
    """
    批量发布多篇文章
    
    Args:
        articles: 文章列表 [{'title': '', 'content': ''}, ...]
    """
    results = []
    
    for i, article in enumerate(articles, 1):
        print(f"\n{'='*60}")
        print(f"发布第{i}篇:{article['title']}")
        print('='*60)
        
        try:
            # 发布
            result = await publish_single(article)
            results.append({'title': article['title'], 'status': 'success'})
            
            # 间隔等待
            await asyncio.sleep(5)
            
        except Exception as e:
            print(f"❌ 发布失败:{e}")
            results.append({'title': article['title'], 'status': 'failed'})
    
    return results

技巧 2:重试机制

import asyncio

async def retry_operation(operation, max_retries=3, delay=2):
    """
    重试包装器
    
    Args:
        operation: 要执行的操作(协程函数)
        max_retries: 最大重试次数
        delay: 重试间隔(秒)
    """
    for attempt in range(1, max_retries + 1):
        try:
            print(f"尝试 {attempt}/{max_retries}")
            return await operation()
        except Exception as e:
            if attempt == max_retries:
                raise
            print(f"失败,{delay}秒后重试...")
            await asyncio.sleep(delay)
    
    raise Exception(f"操作失败,已重试{max_retries}次")

# 使用
result = await retry_operation(
    lambda: upload_cover_image('cover.png'),
    max_retries=3,
    delay=5
)

技巧 3:性能优化

# 禁用不必要的资源加载
await context.route("**/*.{png,jpg,jpeg,gif}", lambda route: route.abort())
await context.route("**/*.css", lambda route: route.abort())
await context.route("**/fonts/*", lambda route: route.abort())

# 设置超时
page.set_default_timeout(30000)  # 30 秒

# 减少视口大小
context = await browser.new_context(
    viewport={"width": 1280, "height": 720}
)

🚀 课后作业

基础题

  1. 使用 Playwright 打开百度并搜索"Python"
  2. 截取搜索结果页面的全屏截图

进阶题

  1. 实现一个简单的网页数据抓取器(抓取知乎热榜)
  2. 为 CSDN 发布工具添加邮箱通知功能

挑战题

  1. 实现分布式爬虫系统(多浏览器实例并行)
  2. 添加反反爬虫策略(随机 User-Agent、代理池)

📚 延伸阅读


💬 总结

核心要点

  1. 🎯 自动等待:Playwright 最核心的优势
  2. 🌐 多浏览器:一次编写,多端测试
  3. 📱 移动模拟:完整的响应式测试方案
  4. 🛠️ 调试工具:截图、日志、网络监控

行动清单

  • ✅ 安装 Playwright:pip install playwright
  • ✅ 安装浏览器:playwright install
  • ✅ 尝试第一个自动化脚本
  • ✅ 为你的项目添加工具

下篇预告:《上下文管理艺术:突破 Token 限制》

  • 长文档处理的三种核心策略
  • RAG 检索增强生成实战
  • 万字长文自动生成系统

敬请期待!🎉

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐