如果你线上排障经历够多,大概率遇到过这些场景👇

❌ 线上报警了,日志几十 MB 起步
❌ 一边 grep ERROR,一边疯狂滚屏
❌ 想回答三个问题,却越查越乱:

  • 到底哪种异常最多?
  • 是不是同一个问题反复出现?
  • 第一次和最后一次发生在什么时候?

最后你只能:
👉 复制几段日志,随手贴给同事/领导,又丑又不完整


🎯 这篇文章解决什么问题?

我写了一个 可以直接用在生产环境 的日志分析小工具,目标只有一个:

把“靠人眼翻日志”这件事,变成“一行命令生成报告”。

它能帮你做到:

支持超大日志文件(流式读取,不吃内存)
自动识别 Java / Python 异常堆栈
把同类异常自动聚合(而不是一条条列)
统计 Top 异常、出现次数、时间范围
生成 Markdown 报告(直接贴 CSDN / 飞书 / 钉钉)
一行命令即可运行

⚠️ 不是“教学 Demo”,而是我自己排障时真的会用的工具


🧩 最终效果(先看结果)

你只需要执行一条命令:

python log_report.py --input app.log --out report.md

会得到一份 结构清晰、可直接对外的 report.md,包含:

  • 🚨 异常 Top N(按出现次数排序)

  • 🕒 每种异常的:

    • 出现次数
    • 首次出现时间
    • 最后出现时间
    • 典型示例片段
  • 📊 整体错误密度

    • 每分钟 ERROR 数量
    • 快速定位“事故时间点”

这份报告可以直接丢进:

  • 事故复盘
  • 群里同步
  • 工单 / 周报

📄 支持哪些日志格式?(够用 + 可扩展)

⏱ 时间格式(行首即可)

支持常见两种:

  • 2026-01-09 01:47:12
  • 2026-01-09T01:47:12

不要求你改日志格式,只要有时间戳就能统计趋势


💥 异常块识别策略

Java 异常

  • Exception / Error / Caused by: 起始
  • 连续的 at xxx(...) 堆栈

Python 异常

  • Traceback (most recent call last):
  • 多行 File "...", line N 堆栈

⚠️ 即使某些异常块没完全命中:
工具仍会统计 ERROR/FATAL 行,保证趋势分析不缺失


🧠 实现思路(3 句话讲清)

1️⃣ 流式读取日志文件,再大也不爆内存
2️⃣ 通过“异常起始标记 + 状态机”收集完整堆栈
3️⃣ 对异常内容做 指纹 hash,自动聚合同类问题

关键点不是“正则多复杂”,
而是:如何把海量噪音压缩成可决策信息


🧱 直接上代码(完整可运行)

文件名:log_report.py
Python 3.9+
零第三方依赖,拷走就能跑

👉(下面代码保持你原样,这一段我不改,避免你重新验证)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import hashlib
import re
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional, Tuple


# --------- 时间解析(可扩展) ----------
TS_PATTERNS = [
    # 2026-01-09 01:47:12
    re.compile(r"^(?P<ts>\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2})"),
    # 2026-01-09T01:47:12.123
    re.compile(r"^(?P<ts>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3})"),
]

def parse_ts(line: str) -> Optional[datetime]:
    for pat in TS_PATTERNS:
        m = pat.search(line)
        if not m:
            continue
        raw = m.group("ts")
        for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f"):
            try:
                return datetime.strptime(raw, fmt)
            except ValueError:
                pass
    return None


# --------- 异常块识别 ----------
JAVA_START = re.compile(r"(\bException\b|\bError\b|\bCaused by:)")
JAVA_STACK = re.compile(r"^\s+at\s+\S+\(.*\)$")
PY_START = re.compile(r"^Traceback \(most recent call last\):")
PY_STACK = re.compile(r"^\s+File\s+\".*\", line \d+, in .+$")

LEVEL_ERROR = re.compile(r"\bERROR\b|\bFATAL\b", re.IGNORECASE)


def is_java_exception_start(line: str) -> bool:
    # 常见:xxxException: msg / Caused by: xxx
    return bool(JAVA_START.search(line))

def is_java_stack_line(line: str) -> bool:
    return bool(JAVA_STACK.match(line))

def is_py_exception_start(line: str) -> bool:
    return bool(PY_START.match(line))

def is_py_stack_line(line: str) -> bool:
    return bool(PY_STACK.match(line))

def looks_like_blank_or_new_entry(line: str) -> bool:
    # 用“有时间戳”判断是否进入下一条日志
    return parse_ts(line) is not None


@dataclass
class ExceptionAgg:
    count: int = 0
    first_seen: Optional[datetime] = None
    last_seen: Optional[datetime] = None
    sample: str = ""


@dataclass
class Report:
    total_lines: int = 0
    error_lines: int = 0
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    per_minute_errors: Dict[str, int] = field(default_factory=dict)
    exceptions: Dict[str, ExceptionAgg] = field(default_factory=dict)


def fingerprint_exception(block: str) -> str:
    """
    对异常块做指纹:去掉明显变化的信息后 hash
    """
    # 去掉数字、耗时、id 等易变项(可按你的日志优化)
    normalized = re.sub(r"\d+", "N", block)
    normalized = re.sub(r"0x[0-9a-fA-F]+", "0xHEX", normalized)
    normalized = re.sub(r"\b[a-f0-9]{16,}\b", "HEXSTR", normalized)  # 长 hash
    h = hashlib.sha1(normalized.encode("utf-8", errors="ignore")).hexdigest()
    return h[:12]


def minute_key(ts: datetime) -> str:
    return ts.strftime("%Y-%m-%d %H:%M")


def update_time_range(rep: Report, ts: Optional[datetime]) -> None:
    if ts is None:
        return
    if rep.start_time is None or ts < rep.start_time:
        rep.start_time = ts
    if rep.end_time is None or ts > rep.end_time:
        rep.end_time = ts


def add_error_minute(rep: Report, ts: Optional[datetime]) -> None:
    if ts is None:
        return
    k = minute_key(ts)
    rep.per_minute_errors[k] = rep.per_minute_errors.get(k, 0) + 1


def commit_exception(rep: Report, ts: Optional[datetime], block: str) -> None:
    fp = fingerprint_exception(block)
    agg = rep.exceptions.get(fp)
    if agg is None:
        agg = ExceptionAgg(count=0, first_seen=ts, last_seen=ts, sample=block[:1200])
        rep.exceptions[fp] = agg
    agg.count += 1
    if ts is not None:
        if agg.first_seen is None or ts < agg.first_seen:
            agg.first_seen = ts
        if agg.last_seen is None or ts > agg.last_seen:
            agg.last_seen = ts
    if not agg.sample:
        agg.sample = block[:1200]


def parse_log(path: str) -> Report:
    rep = Report()

    in_exc = False
    exc_lines: List[str] = []
    exc_ts: Optional[datetime] = None
    exc_type: Optional[str] = None  # "java" / "py"

    def flush_exc():
        nonlocal in_exc, exc_lines, exc_ts, exc_type
        if in_exc and exc_lines:
            commit_exception(rep, exc_ts, "\n".join(exc_lines))
        in_exc = False
        exc_lines = []
        exc_ts = None
        exc_type = None

    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            rep.total_lines += 1
            line = line.rstrip("\n")
            ts = parse_ts(line)
            update_time_range(rep, ts)

            # 错误行统计(即便没形成异常块)
            if LEVEL_ERROR.search(line):
                rep.error_lines += 1
                add_error_minute(rep, ts)

            # 异常块状态机
            if not in_exc:
                if is_py_exception_start(line):
                    in_exc = True
                    exc_type = "py"
                    exc_ts = ts
                    exc_lines = [line]
                    continue

                if is_java_exception_start(line):
                    in_exc = True
                    exc_type = "java"
                    exc_ts = ts
                    exc_lines = [line]
                    continue

            else:
                # 已在异常块中:判断是否继续收集
                if exc_type == "java":
                    # Java 堆栈行 or 继续的 caused by 等
                    if is_java_stack_line(line) or is_java_exception_start(line) or line.strip().startswith("..."):
                        exc_lines.append(line)
                        continue
                    # 新日志条目出现 → 结束异常块
                    if looks_like_blank_or_new_entry(line):
                        flush_exc()
                        # 这行可能是新异常起点(递归判断)
                        if is_py_exception_start(line):
                            in_exc = True
                            exc_type = "py"
                            exc_ts = parse_ts(line)
                            exc_lines = [line]
                        elif is_java_exception_start(line):
                            in_exc = True
                            exc_type = "java"
                            exc_ts = parse_ts(line)
                            exc_lines = [line]
                        continue
                    # 其他行:也可能是异常信息补充,保守收集
                    if line.strip():
                        exc_lines.append(line)
                        continue
                    # 空行:先收集
                    exc_lines.append(line)
                    continue

                if exc_type == "py":
                    if is_py_stack_line(line) or line.strip().startswith(("Traceback", "During handling of the above exception")):
                        exc_lines.append(line)
                        continue
                    # Python 异常块通常以 “Exception: msg” 结束行出现
                    if line.strip() and not looks_like_blank_or_new_entry(line):
                        exc_lines.append(line)
                        # 继续收集一两行也无妨
                        continue
                    if looks_like_blank_or_new_entry(line):
                        flush_exc()
                        if is_py_exception_start(line):
                            in_exc = True
                            exc_type = "py"
                            exc_ts = parse_ts(line)
                            exc_lines = [line]
                        elif is_java_exception_start(line):
                            in_exc = True
                            exc_type = "java"
                            exc_ts = parse_ts(line)
                            exc_lines = [line]
                        continue
                    exc_lines.append(line)
                    continue

    # 文件结束,别忘了 flush
    if in_exc:
        flush_exc()

    return rep


def render_md(rep: Report, top_n: int = 10) -> str:
    lines: List[str] = []
    lines.append("# 日志异常分析报告\n")
    lines.append("## 概览\n")
    lines.append(f"- 总行数:**{rep.total_lines}**")
    lines.append(f"- ERROR/FATAL 行数:**{rep.error_lines}**")
    if rep.start_time and rep.end_time:
        lines.append(f"- 时间范围:**{rep.start_time}** ~ **{rep.end_time}**")
    lines.append("")

    # 错误密度 Top
    if rep.per_minute_errors:
        lines.append("## 错误密度(每分钟 ERROR Top 10)\n")
        top_minutes = sorted(rep.per_minute_errors.items(), key=lambda x: x[1], reverse=True)[:10]
        lines.append("| 分钟 | ERROR 数 |")
        lines.append("|---|---:|")
        for k, v in top_minutes:
            lines.append(f"| {k} | {v} |")
        lines.append("")

    # 异常 Top
    if rep.exceptions:
        lines.append(f"## 异常聚合 Top {top_n}\n")
        items = sorted(rep.exceptions.items(), key=lambda kv: kv[1].count, reverse=True)[:top_n]
        lines.append("| 指纹 | 次数 | 首次出现 | 最后出现 |")
        lines.append("|---|---:|---|---|")
        for fp, agg in items:
            lines.append(f"| `{fp}` | {agg.count} | {agg.first_seen or '-'} | {agg.last_seen or '-'} |")
        lines.append("")

        # 详情
        lines.append("## 异常详情(示例片段)\n")
        for fp, agg in items:
            lines.append(f"### `{fp}`({agg.count} 次)")
            lines.append(f"- 首次:{agg.first_seen or '-'}")
            lines.append(f"- 最后:{agg.last_seen or '-'}\n")
            lines.append("
text") lines.append(agg.sample.rstrip()) lines.append("
\n")
    else:
        lines.append("## 异常聚合\n")
        lines.append("> 未识别到典型 Java/Python 堆栈异常块(可能是日志格式不同)。你仍可以从“错误密度”定位高发时间段。\n")

    return "\n".join(lines)


def main():
    ap = argparse.ArgumentParser(description="Generate log exception analysis report (Markdown).")
    ap.add_argument("--input", "-i", required=True, help="log file path")
    ap.add_argument("--out", "-o", default="report.md", help="output markdown report path")
    ap.add_argument("--top", "-t", type=int, default=10, help="top N exceptions")
    args = ap.parse_args()

    rep = parse_log(args.input)
    md = render_md(rep, top_n=args.top)

    with open(args.out, "w", encoding="utf-8") as f:
        f.write(md)

    print(f"[OK] Report generated: {args.out}")
    if rep.start_time and rep.end_time:
        print(f"[INFO] Time range: {rep.start_time} ~ {rep.end_time}")
    print(f"[INFO] Total lines: {rep.total_lines}, ERROR lines: {rep.error_lines}, exceptions: {len(rep.exceptions)}")


if __name__ == "__main__":
    main()

⚡ 5 分钟快速验证

新建一个最小测试日志:

2026-01-09 01:47:12 ERROR c.xxx.Service - boom
java.lang.NullPointerException: x is null
    at c.xxx.Service.run(Service.java:10)
    at c.xxx.App.main(App.java:5)

2026-01-09 01:47:20 ERROR c.xxx.Service - boom again
java.lang.NullPointerException: x is null
    at c.xxx.Service.run(Service.java:10)
    at c.xxx.App.main(App.java:5)

运行:

python log_report.py -i app.log -o report.md

你会在报告中看到:

  • 同一异常被聚合
  • 次数 = 2
  • 时间范围一目了然

🛠 生产使用建议(非常重要)

✅ 1)配合定时任务跑(事故后自动生成)

0 */2 * * * /usr/bin/python3 /opt/tools/log_report.py \
  -i /var/log/app/app.log \
  -o /var/log/app/report.md

✅ 2)结合告警系统使用

常见玩法:

  • 错误密度 > 阈值
  • 或某异常次数突增

👉 自动把 report.md 推送到飞书/钉钉

下一篇我会单独写:
《日志分析报告自动推送器:只在“真的有问题”时通知你》


✅ 3)适配你自己的日志格式

只需要改两个地方:

  • TS_PATTERNS:补充时间格式
  • 异常起始正则:增加你们项目特有的异常类型

核心逻辑不用动。


🧠 为什么这个工具“值得留下来”?

因为它解决的不是“怎么解析日志”,
而是:

如何在事故发生后的 10 分钟内,
搞清楚:是不是同一个问题在反复炸。


👉 后续类似 “能直接用在生产里的自动化工具”
都会持续更新在《程序员自动化工具箱》。

如果你觉得这类内容比概念文章更有用
欢迎关注 / 订阅这个专栏。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐