告别人肉 grep:我用 Python 做了一个“日志异常分析器”,一行命令生成生产级报告(可直接用)
这篇我给你一个的小工具:✅ 支持大日志(流式读取)✅ 自动抽取异常块(Java/Python 常见堆栈)✅ 聚合 Top 异常、出现次数、首末出现时间✅ 输出(可直接贴到 CSDN/飞书/钉钉)✅ 一行命令运行。
如果你线上排障经历够多,大概率遇到过这些场景👇
❌ 线上报警了,日志几十 MB 起步
❌ 一边grep ERROR,一边疯狂滚屏
❌ 想回答三个问题,却越查越乱:
- 到底哪种异常最多?
- 是不是同一个问题反复出现?
- 第一次和最后一次发生在什么时候?
最后你只能:
👉 复制几段日志,随手贴给同事/领导,又丑又不完整
🎯 这篇文章解决什么问题?
我写了一个 可以直接用在生产环境 的日志分析小工具,目标只有一个:
把“靠人眼翻日志”这件事,变成“一行命令生成报告”。
它能帮你做到:
✅ 支持超大日志文件(流式读取,不吃内存)
✅ 自动识别 Java / Python 异常堆栈
✅ 把同类异常自动聚合(而不是一条条列)
✅ 统计 Top 异常、出现次数、时间范围
✅ 生成 Markdown 报告(直接贴 CSDN / 飞书 / 钉钉)
✅ 一行命令即可运行
⚠️ 不是“教学 Demo”,而是我自己排障时真的会用的工具。
🧩 最终效果(先看结果)
你只需要执行一条命令:
python log_report.py --input app.log --out report.md
会得到一份 结构清晰、可直接对外的 report.md,包含:
-
🚨 异常 Top N(按出现次数排序)
-
🕒 每种异常的:
- 出现次数
- 首次出现时间
- 最后出现时间
- 典型示例片段
-
📊 整体错误密度
- 每分钟 ERROR 数量
- 快速定位“事故时间点”
这份报告可以直接丢进:
- 事故复盘
- 群里同步
- 工单 / 周报
📄 支持哪些日志格式?(够用 + 可扩展)
⏱ 时间格式(行首即可)
支持常见两种:
2026-01-09 01:47:122026-01-09T01:47:12
不要求你改日志格式,只要有时间戳就能统计趋势
💥 异常块识别策略
Java 异常
Exception/Error/Caused by:起始- 连续的
at xxx(...)堆栈
Python 异常
Traceback (most recent call last):- 多行
File "...", line N堆栈
⚠️ 即使某些异常块没完全命中:
工具仍会统计ERROR/FATAL行,保证趋势分析不缺失
🧠 实现思路(3 句话讲清)
1️⃣ 流式读取日志文件,再大也不爆内存
2️⃣ 通过“异常起始标记 + 状态机”收集完整堆栈
3️⃣ 对异常内容做 指纹 hash,自动聚合同类问题
关键点不是“正则多复杂”,
而是:如何把海量噪音压缩成可决策信息。
🧱 直接上代码(完整可运行)
文件名:
log_report.py
Python 3.9+
零第三方依赖,拷走就能跑
👉(下面代码保持你原样,这一段我不改,避免你重新验证)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import hashlib
import re
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional, Tuple
# --------- 时间解析(可扩展) ----------
TS_PATTERNS = [
# 2026-01-09 01:47:12
re.compile(r"^(?P<ts>\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2})"),
# 2026-01-09T01:47:12.123
re.compile(r"^(?P<ts>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3})"),
]
def parse_ts(line: str) -> Optional[datetime]:
for pat in TS_PATTERNS:
m = pat.search(line)
if not m:
continue
raw = m.group("ts")
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f"):
try:
return datetime.strptime(raw, fmt)
except ValueError:
pass
return None
# --------- 异常块识别 ----------
JAVA_START = re.compile(r"(\bException\b|\bError\b|\bCaused by:)")
JAVA_STACK = re.compile(r"^\s+at\s+\S+\(.*\)$")
PY_START = re.compile(r"^Traceback \(most recent call last\):")
PY_STACK = re.compile(r"^\s+File\s+\".*\", line \d+, in .+$")
LEVEL_ERROR = re.compile(r"\bERROR\b|\bFATAL\b", re.IGNORECASE)
def is_java_exception_start(line: str) -> bool:
# 常见:xxxException: msg / Caused by: xxx
return bool(JAVA_START.search(line))
def is_java_stack_line(line: str) -> bool:
return bool(JAVA_STACK.match(line))
def is_py_exception_start(line: str) -> bool:
return bool(PY_START.match(line))
def is_py_stack_line(line: str) -> bool:
return bool(PY_STACK.match(line))
def looks_like_blank_or_new_entry(line: str) -> bool:
# 用“有时间戳”判断是否进入下一条日志
return parse_ts(line) is not None
@dataclass
class ExceptionAgg:
count: int = 0
first_seen: Optional[datetime] = None
last_seen: Optional[datetime] = None
sample: str = ""
@dataclass
class Report:
total_lines: int = 0
error_lines: int = 0
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
per_minute_errors: Dict[str, int] = field(default_factory=dict)
exceptions: Dict[str, ExceptionAgg] = field(default_factory=dict)
def fingerprint_exception(block: str) -> str:
"""
对异常块做指纹:去掉明显变化的信息后 hash
"""
# 去掉数字、耗时、id 等易变项(可按你的日志优化)
normalized = re.sub(r"\d+", "N", block)
normalized = re.sub(r"0x[0-9a-fA-F]+", "0xHEX", normalized)
normalized = re.sub(r"\b[a-f0-9]{16,}\b", "HEXSTR", normalized) # 长 hash
h = hashlib.sha1(normalized.encode("utf-8", errors="ignore")).hexdigest()
return h[:12]
def minute_key(ts: datetime) -> str:
return ts.strftime("%Y-%m-%d %H:%M")
def update_time_range(rep: Report, ts: Optional[datetime]) -> None:
if ts is None:
return
if rep.start_time is None or ts < rep.start_time:
rep.start_time = ts
if rep.end_time is None or ts > rep.end_time:
rep.end_time = ts
def add_error_minute(rep: Report, ts: Optional[datetime]) -> None:
if ts is None:
return
k = minute_key(ts)
rep.per_minute_errors[k] = rep.per_minute_errors.get(k, 0) + 1
def commit_exception(rep: Report, ts: Optional[datetime], block: str) -> None:
fp = fingerprint_exception(block)
agg = rep.exceptions.get(fp)
if agg is None:
agg = ExceptionAgg(count=0, first_seen=ts, last_seen=ts, sample=block[:1200])
rep.exceptions[fp] = agg
agg.count += 1
if ts is not None:
if agg.first_seen is None or ts < agg.first_seen:
agg.first_seen = ts
if agg.last_seen is None or ts > agg.last_seen:
agg.last_seen = ts
if not agg.sample:
agg.sample = block[:1200]
def parse_log(path: str) -> Report:
rep = Report()
in_exc = False
exc_lines: List[str] = []
exc_ts: Optional[datetime] = None
exc_type: Optional[str] = None # "java" / "py"
def flush_exc():
nonlocal in_exc, exc_lines, exc_ts, exc_type
if in_exc and exc_lines:
commit_exception(rep, exc_ts, "\n".join(exc_lines))
in_exc = False
exc_lines = []
exc_ts = None
exc_type = None
with open(path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
rep.total_lines += 1
line = line.rstrip("\n")
ts = parse_ts(line)
update_time_range(rep, ts)
# 错误行统计(即便没形成异常块)
if LEVEL_ERROR.search(line):
rep.error_lines += 1
add_error_minute(rep, ts)
# 异常块状态机
if not in_exc:
if is_py_exception_start(line):
in_exc = True
exc_type = "py"
exc_ts = ts
exc_lines = [line]
continue
if is_java_exception_start(line):
in_exc = True
exc_type = "java"
exc_ts = ts
exc_lines = [line]
continue
else:
# 已在异常块中:判断是否继续收集
if exc_type == "java":
# Java 堆栈行 or 继续的 caused by 等
if is_java_stack_line(line) or is_java_exception_start(line) or line.strip().startswith("..."):
exc_lines.append(line)
continue
# 新日志条目出现 → 结束异常块
if looks_like_blank_or_new_entry(line):
flush_exc()
# 这行可能是新异常起点(递归判断)
if is_py_exception_start(line):
in_exc = True
exc_type = "py"
exc_ts = parse_ts(line)
exc_lines = [line]
elif is_java_exception_start(line):
in_exc = True
exc_type = "java"
exc_ts = parse_ts(line)
exc_lines = [line]
continue
# 其他行:也可能是异常信息补充,保守收集
if line.strip():
exc_lines.append(line)
continue
# 空行:先收集
exc_lines.append(line)
continue
if exc_type == "py":
if is_py_stack_line(line) or line.strip().startswith(("Traceback", "During handling of the above exception")):
exc_lines.append(line)
continue
# Python 异常块通常以 “Exception: msg” 结束行出现
if line.strip() and not looks_like_blank_or_new_entry(line):
exc_lines.append(line)
# 继续收集一两行也无妨
continue
if looks_like_blank_or_new_entry(line):
flush_exc()
if is_py_exception_start(line):
in_exc = True
exc_type = "py"
exc_ts = parse_ts(line)
exc_lines = [line]
elif is_java_exception_start(line):
in_exc = True
exc_type = "java"
exc_ts = parse_ts(line)
exc_lines = [line]
continue
exc_lines.append(line)
continue
# 文件结束,别忘了 flush
if in_exc:
flush_exc()
return rep
def render_md(rep: Report, top_n: int = 10) -> str:
lines: List[str] = []
lines.append("# 日志异常分析报告\n")
lines.append("## 概览\n")
lines.append(f"- 总行数:**{rep.total_lines}**")
lines.append(f"- ERROR/FATAL 行数:**{rep.error_lines}**")
if rep.start_time and rep.end_time:
lines.append(f"- 时间范围:**{rep.start_time}** ~ **{rep.end_time}**")
lines.append("")
# 错误密度 Top
if rep.per_minute_errors:
lines.append("## 错误密度(每分钟 ERROR Top 10)\n")
top_minutes = sorted(rep.per_minute_errors.items(), key=lambda x: x[1], reverse=True)[:10]
lines.append("| 分钟 | ERROR 数 |")
lines.append("|---|---:|")
for k, v in top_minutes:
lines.append(f"| {k} | {v} |")
lines.append("")
# 异常 Top
if rep.exceptions:
lines.append(f"## 异常聚合 Top {top_n}\n")
items = sorted(rep.exceptions.items(), key=lambda kv: kv[1].count, reverse=True)[:top_n]
lines.append("| 指纹 | 次数 | 首次出现 | 最后出现 |")
lines.append("|---|---:|---|---|")
for fp, agg in items:
lines.append(f"| `{fp}` | {agg.count} | {agg.first_seen or '-'} | {agg.last_seen or '-'} |")
lines.append("")
# 详情
lines.append("## 异常详情(示例片段)\n")
for fp, agg in items:
lines.append(f"### `{fp}`({agg.count} 次)")
lines.append(f"- 首次:{agg.first_seen or '-'}")
lines.append(f"- 最后:{agg.last_seen or '-'}\n")
lines.append("
text") lines.append(agg.sample.rstrip()) lines.append("
\n")
else:
lines.append("## 异常聚合\n")
lines.append("> 未识别到典型 Java/Python 堆栈异常块(可能是日志格式不同)。你仍可以从“错误密度”定位高发时间段。\n")
return "\n".join(lines)
def main():
ap = argparse.ArgumentParser(description="Generate log exception analysis report (Markdown).")
ap.add_argument("--input", "-i", required=True, help="log file path")
ap.add_argument("--out", "-o", default="report.md", help="output markdown report path")
ap.add_argument("--top", "-t", type=int, default=10, help="top N exceptions")
args = ap.parse_args()
rep = parse_log(args.input)
md = render_md(rep, top_n=args.top)
with open(args.out, "w", encoding="utf-8") as f:
f.write(md)
print(f"[OK] Report generated: {args.out}")
if rep.start_time and rep.end_time:
print(f"[INFO] Time range: {rep.start_time} ~ {rep.end_time}")
print(f"[INFO] Total lines: {rep.total_lines}, ERROR lines: {rep.error_lines}, exceptions: {len(rep.exceptions)}")
if __name__ == "__main__":
main()
⚡ 5 分钟快速验证
新建一个最小测试日志:
2026-01-09 01:47:12 ERROR c.xxx.Service - boom
java.lang.NullPointerException: x is null
at c.xxx.Service.run(Service.java:10)
at c.xxx.App.main(App.java:5)
2026-01-09 01:47:20 ERROR c.xxx.Service - boom again
java.lang.NullPointerException: x is null
at c.xxx.Service.run(Service.java:10)
at c.xxx.App.main(App.java:5)
运行:
python log_report.py -i app.log -o report.md
你会在报告中看到:
- 同一异常被聚合
- 次数 = 2
- 时间范围一目了然
🛠 生产使用建议(非常重要)
✅ 1)配合定时任务跑(事故后自动生成)
0 */2 * * * /usr/bin/python3 /opt/tools/log_report.py \
-i /var/log/app/app.log \
-o /var/log/app/report.md
✅ 2)结合告警系统使用
常见玩法:
- 错误密度 > 阈值
- 或某异常次数突增
👉 自动把 report.md 推送到飞书/钉钉
下一篇我会单独写:
《日志分析报告自动推送器:只在“真的有问题”时通知你》
✅ 3)适配你自己的日志格式
只需要改两个地方:
TS_PATTERNS:补充时间格式- 异常起始正则:增加你们项目特有的异常类型
核心逻辑不用动。
🧠 为什么这个工具“值得留下来”?
因为它解决的不是“怎么解析日志”,
而是:
如何在事故发生后的 10 分钟内,
搞清楚:是不是同一个问题在反复炸。
👉 后续类似 “能直接用在生产里的自动化工具”,
都会持续更新在《程序员自动化工具箱》。
如果你觉得这类内容比概念文章更有用,
欢迎关注 / 订阅这个专栏。
更多推荐
所有评论(0)