亚马逊广告监控系统实战:Open Claw + Pangolinfo API skill 完整工程实现
本文介绍了基于Python和Pangolinfo SERP API构建亚马逊广告监控系统的完整实现方案。系统通过异步批量采集竞争对手在核心关键词上的SP广告位数据(包括排名、ASIN、价格等信息),采用分层调度机制(A类词2小时/B类词6小时/C类词24小时)进行监控。核心功能包括数据采集层实现、变化检测算法以及分级告警机制(CRITICAL/HIGH/MEDIUM/INFO)。系统架构包含关键词

前言
环境:Python 3.10+,Open Claw 最新版,Pangolinfo SERP API
前置条件:Open Claw 部署和 Pangolinfo API Key 已配置(详见前序文章)
本文聚焦:亚马逊广告监控的数据采集、变化检测、告警通知完整代码实现
亚马逊广告监控不等于盯着自己后台的 ACoS——那只是数据的内视角。真正有价值的是竞争维度:核心关键词当前哪些 ASIN 在投 SP 广告、排在什么位置、价格是多少。一旦有显著变化,立刻知道。
Pangolinfo SERP API 提供 SP 广告位 98% 采集率(业界第一),Open Claw 的工具调用机制让整个监控工作流可以用自然语言触发和扩展。本文直接上代码。
系统架构总览
关键词清单(A/B/C 三层优先级)
↓
Open Claw Scheduler(定时触发,A类2h/B类6h/C类24h)
↓
Pangolinfo SERP API(异步批量采集,max_concurrent=8)
↓
SerpSnapshot 数据模型(SQLite/PostgreSQL 持久化)
↓
SerpChangeDetector(对比基准线 → 生成结构化告警)
↓
LLM 告警解读(CRITICAL/HIGH 级别追加 Claude 分析摘要)
↓
多渠道告警分发(Slack/企业微信/日报邮件)
一、核心数据模型
from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum
@dataclass
class SponsoredPosition:
"""SERP 中的单条广告位数据"""
rank: int
placement: str # 'top_of_search' | 'rest_of_search'
asin: str
brand: str
title: str
price: Optional[float]
rating: Optional[float]
review_count: Optional[int]
@dataclass
class SerpSnapshot:
"""指定关键词在某时刻的 SERP 广告位完整快照"""
keyword: str
marketplace: str
captured_at: str # ISO 8601 UTC
top_of_search: List[SponsoredPosition] = field(default_factory=list)
total_sponsored: int = 0
success: bool = True
error: Optional[str] = None
class AlertLevel(Enum):
CRITICAL = "CRITICAL" # A类词 + 全新竞品占据第1位
HIGH = "HIGH" # Top3 新入场 / 价格大降
MEDIUM = "MEDIUM" # 原有Top3竞品消失 / 位置变动
INFO = "INFO" # C类词变化,仅记录
@dataclass
class AdAlertEvent:
keyword: str
level: AlertLevel
event_type: str # top1_changed / new_top3 / top3_exit / price_drop
summary: str
affected_asin: str
previous: Optional[str] = None
current: Optional[str] = None
timestamp: str = ""
llm_analysis: Optional[str] = None
二、数据采集层:异步 Pangolinfo SERP API 调用
import asyncio
import aiohttp
import logging
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
PANGOLINFO_API_KEY = "your_api_key"
SERP_ENDPOINT = "https://api.pangolinfo.com/v1/serp"
async def fetch_serp_single(
session: aiohttp.ClientSession,
keyword: str,
marketplace: str,
semaphore: asyncio.Semaphore
) -> SerpSnapshot:
"""单个关键词 SERP 异步抓取"""
captured_at = datetime.now(timezone.utc).isoformat()
async with semaphore:
headers = {
"Authorization": f"Bearer {PANGOLINFO_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"source": "amazon_search",
"query": keyword,
"marketplace": marketplace,
"page": 1,
"include_sponsored": True, # 关键参数:必须开启
"include_organic": False, # 广告监控场景只需要广告位数据
"output_format": "json"
}
try:
async with session.post(
SERP_ENDPOINT, headers=headers, json=payload,
timeout=aiohttp.ClientTimeout(total=30)
) as resp:
resp.raise_for_status()
data = await resp.json()
# 解析广告位数据
positions = []
for item in data.get("sponsored_results", []):
positions.append(SponsoredPosition(
rank=item.get("ad_rank", 0),
placement=item.get("ad_placement", "unknown"),
asin=item.get("asin", ""),
brand=item.get("brand", ""),
title=item.get("title", ""),
price=item.get("price"),
rating=item.get("rating"),
review_count=item.get("review_count"),
))
# 按广告位类型过滤,优先 top_of_search
top_ads = sorted(
[p for p in positions if "top" in p.placement.lower()],
key=lambda x: x.rank
)
return SerpSnapshot(
keyword=keyword, marketplace=marketplace,
captured_at=captured_at, top_of_search=top_ads,
total_sponsored=len(positions), success=True
)
except asyncio.TimeoutError:
logger.warning(f"Timeout: '{keyword}' @ {marketplace}")
return SerpSnapshot(keyword=keyword, marketplace=marketplace,
captured_at=captured_at,
success=False, error="timeout")
except Exception as e:
logger.error(f"SERP error '{keyword}': {e}")
return SerpSnapshot(keyword=keyword, marketplace=marketplace,
captured_at=captured_at,
success=False, error=str(e))
async def batch_fetch_serp(
keywords: List[str],
marketplace: str = "US",
max_concurrent: int = 8 # 经验值,避免触发速率限制
) -> List[SerpSnapshot]:
"""批量异步抓取,50个关键词约30-60秒完成"""
semaphore = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
tasks = [
fetch_serp_single(session, kw, marketplace, semaphore)
for kw in keywords
]
results = await asyncio.gather(*tasks)
ok = sum(1 for r in results if r.success)
logger.info(f"批量采集完成:{ok}/{len(keywords)} 成功")
return results

性能参考数据:
| 场景 | 同步顺序 | 异步并发(max=8) |
|---|---|---|
| 10 个关键词 | ~2 min | ~15 sec |
| 50 个关键词 | ~8 min | ~45 sec |
| 100 个关键词 | ~18 min | ~90 sec |
三、变化检测层:告警逻辑实现
from anthropic import Anthropic
class SerpChangeDetector:
"""
SERP 广告位变化检测器
根据关键词优先级动态调整告警级别
"""
def __init__(self, config: dict):
self.config = config
self.llm = Anthropic()
# 构建关键词→优先级映射
self._tier_map = {}
for tier, keywords in config.get("keyword_tiers", {}).items():
for kw in keywords:
self._tier_map[kw.lower()] = tier
def _get_tier(self, keyword: str) -> str:
return self._tier_map.get(keyword.lower(), "B")
def _scale_alert(self, base: AlertLevel, keyword: str) -> AlertLevel:
"""A类词升级告警,C类词降级告警"""
tier = self._get_tier(keyword)
if tier == "A":
return {
AlertLevel.MEDIUM: AlertLevel.HIGH,
AlertLevel.HIGH: AlertLevel.CRITICAL,
}.get(base, base)
if tier == "C":
return AlertLevel.INFO # C类词统一降为INFO,只记录
return base
def compare(
self,
current: SerpSnapshot,
baseline: Optional[SerpSnapshot]
) -> List[AdAlertEvent]:
"""
对比当前快照与历史基准线,输出事件告警列表
首次运行时 baseline=None,只存储数据,不产生告警
"""
if baseline is None:
return []
events = []
kw = current.keyword
curr_top3 = {p.asin for p in current.top_of_search[:3]}
base_top3 = {p.asin for p in baseline.top_of_search[:3]}
curr_p1_asin = current.top_of_search[0].asin if current.top_of_search else None
base_p1_asin = baseline.top_of_search[0].asin if baseline.top_of_search else None
# ── 事件1:第1位发生变化 ─────────────────────────────
if curr_p1_asin and curr_p1_asin != base_p1_asin:
is_brand_new = curr_p1_asin not in base_top3
base_lv = AlertLevel.CRITICAL if is_brand_new else AlertLevel.HIGH
events.append(AdAlertEvent(
keyword=kw,
level=self._scale_alert(base_lv, kw),
event_type="top1_changed",
summary=(
f"广告第1位变化:{base_p1_asin} → {curr_p1_asin}"
+ ("(全新竞品)" if is_brand_new else "")
),
affected_asin=curr_p1_asin,
previous=base_p1_asin,
current=curr_p1_asin,
timestamp=current.captured_at
))
# ── 事件2:Top3 新入场竞品 ───────────────────────────
for asin in (curr_top3 - base_top3 - {curr_p1_asin}):
rank = next((p.rank for p in current.top_of_search
if p.asin == asin), -1)
events.append(AdAlertEvent(
keyword=kw,
level=self._scale_alert(AlertLevel.HIGH, kw),
event_type="new_top3_entrant",
summary=f"新竞品进入 Top3 广告位(#{rank}):{asin}",
affected_asin=asin,
current=f"#{rank}",
timestamp=current.captured_at
))
# ── 事件3:原 Top3 竞品消失 ────────────────────────
for asin in (base_top3 - curr_top3):
events.append(AdAlertEvent(
keyword=kw,
level=self._scale_alert(AlertLevel.MEDIUM, kw),
event_type="top3_exit",
summary=f"竞品 {asin} 退出 Top3,可能缩减预算",
affected_asin=asin,
previous="Top3内",
current="Top3外",
timestamp=current.captured_at
))
# ── 事件4:Top3 内大幅降价 ───────────────────────────
threshold = self.config.get("price_drop_pct", 10) / 100
base_prices = {p.asin: p.price
for p in baseline.top_of_search[:3] if p.price}
for pos in current.top_of_search[:3]:
old = base_prices.get(pos.asin)
if old and pos.price and old > 0:
drop = (old - pos.price) / old
if drop >= threshold:
events.append(AdAlertEvent(
keyword=kw,
level=self._scale_alert(AlertLevel.HIGH, kw),
event_type="price_drop",
summary=(
f"Top3 竞品 {pos.asin} 降价 {drop*100:.1f}%:"
f"${old:.2f} → ${pos.price:.2f}"
),
affected_asin=pos.asin,
previous=f"${old:.2f}",
current=f"${pos.price:.2f}",
timestamp=current.captured_at
))
# ── LLM 增强:对 CRITICAL/HIGH 告警追加分析 ──────────
for event in events:
if event.level in (AlertLevel.CRITICAL, AlertLevel.HIGH):
event.llm_analysis = self._llm_enrich(event, current)
return events
def _llm_enrich(self, event: AdAlertEvent,
snap: SerpSnapshot) -> str:
"""调用 Claude 对告警做业务解读(100字以内)"""
top3_str = "\n".join([
f" #{p.rank}: {p.asin} | {p.brand} | ${p.price or 'N/A'}"
for p in snap.top_of_search[:3]
])
resp = self.llm.messages.create(
model="claude-3-7-sonnet-20250219",
max_tokens=200,
messages=[{
"role": "user",
"content": (
f"关键词「{event.keyword}」广告位告警分析:\n"
f"事件:{event.summary}\n"
f"当前 Top3:\n{top3_str}\n\n"
"请用100字以内给出:1) 最可能的原因;2) 运营需关注的事项。"
"直接输出,无需开场白。"
)
}]
)
return resp.content[0].text
四、告警分发与去重
import sqlite3
import aiohttp
from datetime import timedelta
class AlertDispatcher:
"""告警去重 + 多渠道分发"""
def __init__(self, db_path: str, slack_webhook: str):
self.db_path = db_path
self.webhook = slack_webhook
def is_duplicate(self, event: AdAlertEvent, cooldown_hours: int = 6) -> bool:
"""同一 keyword+asin+event_type 6小时内只发一次"""
cutoff = (datetime.utcnow() -
timedelta(hours=cooldown_hours)).isoformat()
with sqlite3.connect(self.db_path) as conn:
row = conn.execute("""
SELECT 1 FROM alert_log
WHERE keyword = ? AND affected_asin = ?
AND event_type = ? AND triggered_at > ? AND sent = 1
LIMIT 1
""", (event.keyword, event.affected_asin,
event.event_type, cutoff)).fetchone()
return row is not None
async def dispatch(self, events: List[AdAlertEvent]):
to_notify = [
e for e in events
if e.level in (AlertLevel.CRITICAL, AlertLevel.HIGH)
and not self.is_duplicate(e)
]
async with aiohttp.ClientSession() as session:
for event in to_notify:
emoji = "🚨" if event.level == AlertLevel.CRITICAL else "⚠️"
text = (
f"{emoji} *[亚马逊广告监控]* `{event.level.value}`\n"
f"关键词:`{event.keyword}`\n"
f"事件:{event.summary}\n"
+ (f"分析:_{event.llm_analysis}_\n"
if event.llm_analysis else "")
+ f"时间:{event.timestamp[:16]} UTC"
)
await session.post(self.webhook, json={"text": text},
timeout=aiohttp.ClientTimeout(total=10))
# 记录已发送
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO alert_log
(keyword, affected_asin, event_type,
summary, triggered_at, sent)
VALUES (?, ?, ?, ?, ?, 1)
""", (event.keyword, event.affected_asin,
event.event_type, event.summary, event.timestamp))
五、常见问题与解决方案
Q:抓取的广告位数量比实际搜索页少,怎么回事?
A:亚马逊不同位置的广告位(顶部/中部/底部)在 API 返回时有不同字段标识。确认 payload 中 include_sponsored: True 已开启,并检查是否只过滤了 top_of_search 导致遗漏其他位置。如需完整覆盖,去掉 placement 过滤逻辑,改为对全部 sponsored 结果按 rank 排序。
Q:频繁触发 429 限流错误?
A:降低 max_concurrent 参数(从 8 降到 5),并在 _do_fetch 里加指数退避重试:
import random
async def retry_with_backoff(coro, max_retries=3):
for i in range(max_retries):
try:
return await coro
except aiohttp.ClientResponseError as e:
if e.status == 429 and i < max_retries - 1:
wait = (2 ** i) + random.uniform(0, 1)
await asyncio.sleep(wait)
else:
raise
Q:SQLite 高频写入出现锁等待?
A:超过 50 个关键词、2小时一次的监控频率就应考虑切换 PostgreSQL。SQLite 的 WAL 模式可以缓解,但根本解决需换数据库:
# SQLite WAL 模式(临时方案)
conn.execute("PRAGMA journal_mode=WAL")
总结
亚马逊广告监控系统的工程实现关键点:
- 数据实时性:Pangolinfo SERP API 分钟级采集,SP 广告位 98% 覆盖率
- 异步并发:asyncio + aiohttp,批量监控效率提升 10 倍以上
- 分层告警:关键词按优先级分 A/B/C 层,不同频率、不同告警阈值
- 去重逻辑:同一事件 6 小时内只发一次,避免告警疲劳
- LLM 增强:CRITICAL/HIGH 告警追加 Claude 业务分析,帮助运营快速决策
参考资料
openclaw-skill:https://github.com/Pangolin-spg/openclaw-skill-pangolinfo.git
更多推荐
所有评论(0)