在这里插入图片描述

前言

环境:Python 3.10+,Open Claw 最新版,Pangolinfo SERP API
前置条件:Open Claw 部署和 Pangolinfo API Key 已配置(详见前序文章)
本文聚焦:亚马逊广告监控的数据采集、变化检测、告警通知完整代码实现

亚马逊广告监控不等于盯着自己后台的 ACoS——那只是数据的内视角。真正有价值的是竞争维度:核心关键词当前哪些 ASIN 在投 SP 广告、排在什么位置、价格是多少。一旦有显著变化,立刻知道。

Pangolinfo SERP API 提供 SP 广告位 98% 采集率(业界第一),Open Claw 的工具调用机制让整个监控工作流可以用自然语言触发和扩展。本文直接上代码。


系统架构总览

关键词清单(A/B/C 三层优先级)
    ↓
Open Claw Scheduler(定时触发,A类2h/B类6h/C类24h)
    ↓
Pangolinfo SERP API(异步批量采集,max_concurrent=8)
    ↓
SerpSnapshot 数据模型(SQLite/PostgreSQL 持久化)
    ↓
SerpChangeDetector(对比基准线 → 生成结构化告警)
    ↓
LLM 告警解读(CRITICAL/HIGH 级别追加 Claude 分析摘要)
    ↓
多渠道告警分发(Slack/企业微信/日报邮件)

一、核心数据模型

from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum

@dataclass
class SponsoredPosition:
    """SERP 中的单条广告位数据"""
    rank: int
    placement: str      # 'top_of_search' | 'rest_of_search'
    asin: str
    brand: str
    title: str
    price: Optional[float]
    rating: Optional[float]
    review_count: Optional[int]

@dataclass
class SerpSnapshot:
    """指定关键词在某时刻的 SERP 广告位完整快照"""
    keyword: str
    marketplace: str
    captured_at: str                                    # ISO 8601 UTC
    top_of_search: List[SponsoredPosition] = field(default_factory=list)
    total_sponsored: int = 0
    success: bool = True
    error: Optional[str] = None

class AlertLevel(Enum):
    CRITICAL = "CRITICAL"   # A类词 + 全新竞品占据第1位
    HIGH     = "HIGH"       # Top3 新入场 / 价格大降
    MEDIUM   = "MEDIUM"     # 原有Top3竞品消失 / 位置变动
    INFO     = "INFO"       # C类词变化,仅记录

@dataclass
class AdAlertEvent:
    keyword: str
    level: AlertLevel
    event_type: str      # top1_changed / new_top3 / top3_exit / price_drop
    summary: str
    affected_asin: str
    previous: Optional[str] = None
    current: Optional[str] = None
    timestamp: str = ""
    llm_analysis: Optional[str] = None

二、数据采集层:异步 Pangolinfo SERP API 调用

import asyncio
import aiohttp
import logging
from datetime import datetime, timezone

logger = logging.getLogger(__name__)

PANGOLINFO_API_KEY = "your_api_key"
SERP_ENDPOINT = "https://api.pangolinfo.com/v1/serp"


async def fetch_serp_single(
    session: aiohttp.ClientSession,
    keyword: str,
    marketplace: str,
    semaphore: asyncio.Semaphore
) -> SerpSnapshot:
    """单个关键词 SERP 异步抓取"""
    captured_at = datetime.now(timezone.utc).isoformat()

    async with semaphore:
        headers = {
            "Authorization": f"Bearer {PANGOLINFO_API_KEY}",
            "Content-Type": "application/json"
        }
        payload = {
            "source": "amazon_search",
            "query": keyword,
            "marketplace": marketplace,
            "page": 1,
            "include_sponsored": True,  # 关键参数:必须开启
            "include_organic": False,   # 广告监控场景只需要广告位数据
            "output_format": "json"
        }

        try:
            async with session.post(
                SERP_ENDPOINT, headers=headers, json=payload,
                timeout=aiohttp.ClientTimeout(total=30)
            ) as resp:
                resp.raise_for_status()
                data = await resp.json()

            # 解析广告位数据
            positions = []
            for item in data.get("sponsored_results", []):
                positions.append(SponsoredPosition(
                    rank=item.get("ad_rank", 0),
                    placement=item.get("ad_placement", "unknown"),
                    asin=item.get("asin", ""),
                    brand=item.get("brand", ""),
                    title=item.get("title", ""),
                    price=item.get("price"),
                    rating=item.get("rating"),
                    review_count=item.get("review_count"),
                ))

            # 按广告位类型过滤,优先 top_of_search
            top_ads = sorted(
                [p for p in positions if "top" in p.placement.lower()],
                key=lambda x: x.rank
            )

            return SerpSnapshot(
                keyword=keyword, marketplace=marketplace,
                captured_at=captured_at, top_of_search=top_ads,
                total_sponsored=len(positions), success=True
            )

        except asyncio.TimeoutError:
            logger.warning(f"Timeout: '{keyword}' @ {marketplace}")
            return SerpSnapshot(keyword=keyword, marketplace=marketplace,
                               captured_at=captured_at,
                               success=False, error="timeout")
        except Exception as e:
            logger.error(f"SERP error '{keyword}': {e}")
            return SerpSnapshot(keyword=keyword, marketplace=marketplace,
                               captured_at=captured_at,
                               success=False, error=str(e))


async def batch_fetch_serp(
    keywords: List[str],
    marketplace: str = "US",
    max_concurrent: int = 8  # 经验值,避免触发速率限制
) -> List[SerpSnapshot]:
    """批量异步抓取,50个关键词约30-60秒完成"""
    semaphore = asyncio.Semaphore(max_concurrent)
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_serp_single(session, kw, marketplace, semaphore)
            for kw in keywords
        ]
        results = await asyncio.gather(*tasks)

    ok = sum(1 for r in results if r.success)
    logger.info(f"批量采集完成:{ok}/{len(keywords)} 成功")
    return results

在这里插入图片描述

性能参考数据

场景 同步顺序 异步并发(max=8)
10 个关键词 ~2 min ~15 sec
50 个关键词 ~8 min ~45 sec
100 个关键词 ~18 min ~90 sec

三、变化检测层:告警逻辑实现

from anthropic import Anthropic

class SerpChangeDetector:
    """
    SERP 广告位变化检测器
    根据关键词优先级动态调整告警级别
    """

    def __init__(self, config: dict):
        self.config = config
        self.llm = Anthropic()
        # 构建关键词→优先级映射
        self._tier_map = {}
        for tier, keywords in config.get("keyword_tiers", {}).items():
            for kw in keywords:
                self._tier_map[kw.lower()] = tier

    def _get_tier(self, keyword: str) -> str:
        return self._tier_map.get(keyword.lower(), "B")

    def _scale_alert(self, base: AlertLevel, keyword: str) -> AlertLevel:
        """A类词升级告警,C类词降级告警"""
        tier = self._get_tier(keyword)
        if tier == "A":
            return {
                AlertLevel.MEDIUM: AlertLevel.HIGH,
                AlertLevel.HIGH: AlertLevel.CRITICAL,
            }.get(base, base)
        if tier == "C":
            return AlertLevel.INFO  # C类词统一降为INFO,只记录
        return base

    def compare(
        self,
        current: SerpSnapshot,
        baseline: Optional[SerpSnapshot]
    ) -> List[AdAlertEvent]:
        """
        对比当前快照与历史基准线,输出事件告警列表

        首次运行时 baseline=None,只存储数据,不产生告警
        """
        if baseline is None:
            return []

        events = []
        kw = current.keyword

        curr_top3 = {p.asin for p in current.top_of_search[:3]}
        base_top3 = {p.asin for p in baseline.top_of_search[:3]}
        curr_p1_asin = current.top_of_search[0].asin if current.top_of_search else None
        base_p1_asin = baseline.top_of_search[0].asin if baseline.top_of_search else None

        # ── 事件1:第1位发生变化 ─────────────────────────────
        if curr_p1_asin and curr_p1_asin != base_p1_asin:
            is_brand_new = curr_p1_asin not in base_top3
            base_lv = AlertLevel.CRITICAL if is_brand_new else AlertLevel.HIGH
            events.append(AdAlertEvent(
                keyword=kw,
                level=self._scale_alert(base_lv, kw),
                event_type="top1_changed",
                summary=(
                    f"广告第1位变化:{base_p1_asin}{curr_p1_asin}"
                    + ("(全新竞品)" if is_brand_new else "")
                ),
                affected_asin=curr_p1_asin,
                previous=base_p1_asin,
                current=curr_p1_asin,
                timestamp=current.captured_at
            ))

        # ── 事件2:Top3 新入场竞品 ───────────────────────────
        for asin in (curr_top3 - base_top3 - {curr_p1_asin}):
            rank = next((p.rank for p in current.top_of_search
                         if p.asin == asin), -1)
            events.append(AdAlertEvent(
                keyword=kw,
                level=self._scale_alert(AlertLevel.HIGH, kw),
                event_type="new_top3_entrant",
                summary=f"新竞品进入 Top3 广告位(#{rank}):{asin}",
                affected_asin=asin,
                current=f"#{rank}",
                timestamp=current.captured_at
            ))

        # ── 事件3:原 Top3 竞品消失 ────────────────────────
        for asin in (base_top3 - curr_top3):
            events.append(AdAlertEvent(
                keyword=kw,
                level=self._scale_alert(AlertLevel.MEDIUM, kw),
                event_type="top3_exit",
                summary=f"竞品 {asin} 退出 Top3,可能缩减预算",
                affected_asin=asin,
                previous="Top3内",
                current="Top3外",
                timestamp=current.captured_at
            ))

        # ── 事件4:Top3 内大幅降价 ───────────────────────────
        threshold = self.config.get("price_drop_pct", 10) / 100
        base_prices = {p.asin: p.price
                       for p in baseline.top_of_search[:3] if p.price}
        for pos in current.top_of_search[:3]:
            old = base_prices.get(pos.asin)
            if old and pos.price and old > 0:
                drop = (old - pos.price) / old
                if drop >= threshold:
                    events.append(AdAlertEvent(
                        keyword=kw,
                        level=self._scale_alert(AlertLevel.HIGH, kw),
                        event_type="price_drop",
                        summary=(
                            f"Top3 竞品 {pos.asin} 降价 {drop*100:.1f}%:"
                            f"${old:.2f} → ${pos.price:.2f}"
                        ),
                        affected_asin=pos.asin,
                        previous=f"${old:.2f}",
                        current=f"${pos.price:.2f}",
                        timestamp=current.captured_at
                    ))

        # ── LLM 增强:对 CRITICAL/HIGH 告警追加分析 ──────────
        for event in events:
            if event.level in (AlertLevel.CRITICAL, AlertLevel.HIGH):
                event.llm_analysis = self._llm_enrich(event, current)

        return events

    def _llm_enrich(self, event: AdAlertEvent,
                    snap: SerpSnapshot) -> str:
        """调用 Claude 对告警做业务解读(100字以内)"""
        top3_str = "\n".join([
            f"  #{p.rank}: {p.asin} | {p.brand} | ${p.price or 'N/A'}"
            for p in snap.top_of_search[:3]
        ])
        resp = self.llm.messages.create(
            model="claude-3-7-sonnet-20250219",
            max_tokens=200,
            messages=[{
                "role": "user",
                "content": (
                    f"关键词「{event.keyword}」广告位告警分析:\n"
                    f"事件:{event.summary}\n"
                    f"当前 Top3:\n{top3_str}\n\n"
                    "请用100字以内给出:1) 最可能的原因;2) 运营需关注的事项。"
                    "直接输出,无需开场白。"
                )
            }]
        )
        return resp.content[0].text

四、告警分发与去重

import sqlite3
import aiohttp
from datetime import timedelta

class AlertDispatcher:
    """告警去重 + 多渠道分发"""

    def __init__(self, db_path: str, slack_webhook: str):
        self.db_path = db_path
        self.webhook = slack_webhook

    def is_duplicate(self, event: AdAlertEvent, cooldown_hours: int = 6) -> bool:
        """同一 keyword+asin+event_type 6小时内只发一次"""
        cutoff = (datetime.utcnow() -
                  timedelta(hours=cooldown_hours)).isoformat()
        with sqlite3.connect(self.db_path) as conn:
            row = conn.execute("""
                SELECT 1 FROM alert_log
                WHERE keyword = ? AND affected_asin = ?
                  AND event_type = ? AND triggered_at > ? AND sent = 1
                LIMIT 1
            """, (event.keyword, event.affected_asin,
                  event.event_type, cutoff)).fetchone()
        return row is not None

    async def dispatch(self, events: List[AdAlertEvent]):
        to_notify = [
            e for e in events
            if e.level in (AlertLevel.CRITICAL, AlertLevel.HIGH)
            and not self.is_duplicate(e)
        ]

        async with aiohttp.ClientSession() as session:
            for event in to_notify:
                emoji = "🚨" if event.level == AlertLevel.CRITICAL else "⚠️"
                text = (
                    f"{emoji} *[亚马逊广告监控]* `{event.level.value}`\n"
                    f"关键词:`{event.keyword}`\n"
                    f"事件:{event.summary}\n"
                    + (f"分析:_{event.llm_analysis}_\n"
                       if event.llm_analysis else "")
                    + f"时间:{event.timestamp[:16]} UTC"
                )
                await session.post(self.webhook, json={"text": text},
                                   timeout=aiohttp.ClientTimeout(total=10))

                # 记录已发送
                with sqlite3.connect(self.db_path) as conn:
                    conn.execute("""
                        INSERT INTO alert_log
                        (keyword, affected_asin, event_type,
                         summary, triggered_at, sent)
                        VALUES (?, ?, ?, ?, ?, 1)
                    """, (event.keyword, event.affected_asin,
                          event.event_type, event.summary, event.timestamp))

五、常见问题与解决方案

Q:抓取的广告位数量比实际搜索页少,怎么回事?

A:亚马逊不同位置的广告位(顶部/中部/底部)在 API 返回时有不同字段标识。确认 payload 中 include_sponsored: True 已开启,并检查是否只过滤了 top_of_search 导致遗漏其他位置。如需完整覆盖,去掉 placement 过滤逻辑,改为对全部 sponsored 结果按 rank 排序。

Q:频繁触发 429 限流错误?

A:降低 max_concurrent 参数(从 8 降到 5),并在 _do_fetch 里加指数退避重试:

import random

async def retry_with_backoff(coro, max_retries=3):
    for i in range(max_retries):
        try:
            return await coro
        except aiohttp.ClientResponseError as e:
            if e.status == 429 and i < max_retries - 1:
                wait = (2 ** i) + random.uniform(0, 1)
                await asyncio.sleep(wait)
            else:
                raise

Q:SQLite 高频写入出现锁等待?

A:超过 50 个关键词、2小时一次的监控频率就应考虑切换 PostgreSQL。SQLite 的 WAL 模式可以缓解,但根本解决需换数据库:

# SQLite WAL 模式(临时方案)
conn.execute("PRAGMA journal_mode=WAL")

总结

亚马逊广告监控系统的工程实现关键点:

  1. 数据实时性:Pangolinfo SERP API 分钟级采集,SP 广告位 98% 覆盖率
  2. 异步并发:asyncio + aiohttp,批量监控效率提升 10 倍以上
  3. 分层告警:关键词按优先级分 A/B/C 层,不同频率、不同告警阈值
  4. 去重逻辑:同一事件 6 小时内只发一次,避免告警疲劳
  5. LLM 增强:CRITICAL/HIGH 告警追加 Claude 业务分析,帮助运营快速决策

参考资料

openclaw-skill:https://github.com/Pangolin-spg/openclaw-skill-pangolinfo.git

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐