一、背景:为什么弃用 Elasticsearch?选择 ClickHouse?

在早期日志检索系统中,Elasticsearch(ES)几乎是事实标准

  • 倒排索引,全文检索能力强

  • 生态成熟,Kibana 使用友好

  • 对研发同学上手成本低

但是:日志查询需要“精准模糊匹配”,而不是分词搜索

在日志场景中,一个经常被忽略但非常关键的事实是:

日志查询 ≠ 自然语言搜索

绝大多数日志检索关注的是:

  • traceId / requestId

  • error code

  • 固定格式的异常片段

  • 精确字符串上下文

例如:

#0 (cfeebeea05bd01966dc6320727f2fa7

java.lang.IllegalStateException

Timeout while waiting for xxx

这些内容的共同特点是:

  • 强顺序性

  • 不可分词

  • 分词后反而会破坏语义


ES 在日志场景下的“隐性问题”

为了“不分词”,ES 在日志场景下需要付出的额外代价

在真实生产环境中,为了让 ES 尽量接近“原始字符串匹配”,通常需要做大量额外工作:

 必须使用 keyword 字段

  • text 字段会被 analyzer 分词,无法用于精准日志检索

  • 需要在 mapping 中同时维护:

{

"message": {

"type": "text",

"fields": {

"keyword": {

"type": "keyword",

"ignore_above": 32766

}

}

}

}

mapping 复杂度显著提升。


对比之下,ClickHouse 更“朴素”:

  • 不分词

  • 不猜测语义

  • 不依赖 mapping 技巧

只需要:

position(message, 'IllegalStateException') > 0

或者:

multiMatchAny(message, ['timeout', 'connection refused'])

即可完成日志工程中最常见、也是最可靠的查询需求。


在日志系统中,复杂并不等于高级,可预期才是核心竞争

这种模模式:
  • 更符合日志系统查询需求

  • 查询结果可预期

  • 不存在 analyzer 差异


对日志系统来说: “精准的模糊匹配” 比 “智能分词” 更重要

维度 ES ClickHouse
数据模型 文档 列式
存储成本 低 3~5 倍
写入吞吐 一般 极高(百万/s)
批量分析 一般 极强
运维复杂度 相对简单

对日志系统而言:先过滤,再字符串匹配,ClickHouse 更合适


二、基于 Flink 的高吞吐日志写入方案

2.1 总体架构

日志源 → Kafka → Flink → ClickHouse

  • Kafka:削峰、缓冲

  • Flink:

    • 清洗

    • 结构化

    • 批量写入

  • ClickHouse:长期存储 + 查询


2.2 为什么必须“批量写入”?

ClickHouse 的核心特性:

  • Append Only

  • 写入单位是 Block(而不是单行)

❌ 单条 INSERT:

  • QPS 高

  • IO 放大严重

✅ 批量 INSERT:

  • 吞吐提升 10~100 倍

  • Merge 压力显著降低


2.3 Flink 写入 ClickHouse 的关键点

1️⃣ Sink 侧缓存
  • 条数 / 时间 flush

  • 常见配置:

    • 5k~20k 行

    • 或 1~3 秒

2️⃣ 异步 + 批量
  • 使用 PreparedStatement.addBatch()

  • 禁止逐条 execute

3️⃣ 严格控制并发
  • 并发过高会导致:

    • Too many parts

    • Merge backlog

实战经验:并发 < CPU 核数

这里只提供自己写的clickhouse_sink供大家参考,整体flink代码参考flink官网

package com.techwolf.datastar.sink;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class LogClickhouseSink extends RichSinkFunction<String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(LogClickhouseSink.class);

    private List<JSONObject> logList;
    private Connection LogConnection;
    private JSONObject logObj;

    private String INSERT_TCP_SQL = "insert into xx_job_log_local(threadId, " +
            "dateTime, loggerFqcn, level, endOfBatch, thread, message, " +
            "threadPriority,yarnContainerId, instant, timestamp, loggerName ) " +
            "values(?,?,?,?,?,?,?,?,?,?,?,?) " ;


    @Override
    public void open(Configuration parameters) throws Exception {

        super.open(parameters);
        logList = new ArrayList<>();
        LogConnection = getLogConnection();
        flinkLogConnection.setAutoCommit(true);

    }


    @Override
    public void invoke(String value, Context context) throws Exception {
        super.invoke(value, context);
        try {
            logObj = JSONObject.parseObject(value);
            logObj.put("message", getStackTraceMessage(logObj));
            logList.add(logObj);
            if (logList.size() < 1000) {
              return;
            }
            PreparedStatement preparedStatement = flinkLogConnection.prepareStatement(INSERT_TCP_SQL);
            for (JSONObject event : logList) {
                preparedStatement.setLong(1, event.getInteger("threadId"));
                preparedStatement.setString(2, event.getString("dateTime"));
                preparedStatement.setString(3, event.getString("loggerFqcn"));
                preparedStatement.setString(4, event.getString("level"));
                preparedStatement.setString(5, event.getString("endOfBatch"));
                preparedStatement.setString(6, event.getString("thread"));
                preparedStatement.setString(7, event.getString("message"));
                preparedStatement.setString(8, event.getString("threadPriority"));
                preparedStatement.setString(9, event.getString("yarnContainerId"));
                preparedStatement.setString(10, event.getString("instant"));
                preparedStatement.setLong(11, event.getLong("timestamp"));
                preparedStatement.setString(12, event.getString("loggerName"));
                preparedStatement.addBatch();
            }
            preparedStatement.executeBatch();
            preparedStatement.clearBatch();
            flinkLogConnection.commit();
            //清空日志list
            logList.clear();
            preparedStatement.close();

        } catch (Exception e) {
            LOGGER.error("parser json log event fail :" + e);
            LOGGER.error("parser value  :" + value);
        }

    }

    /**
     * 初始化日志栈
     * @param jsonObject
     * @return
     */
    private static String getStackTraceMessage(JSONObject jsonObject) {
        String logMessage = jsonObject.getString("message");
        try {
            StringBuilder stackTranceSb = new StringBuilder();
            JSONObject  logThrownObj = jsonObject.getJSONObject("thrown");
            if (Objects.nonNull(logThrownObj)) {
                String thrownMessage = StringUtils.defaultString(logThrownObj.getString("message"), "");
                JSONArray stackTraceArray = logThrownObj.getJSONArray("extendedStackTrace");
                String causeMessage = StringUtils.EMPTY;
                if (Objects.nonNull(stackTraceArray)) {
                    JSONObject causeObj = logThrownObj.getJSONObject("cause");
                    if (Objects.nonNull(causeObj)) {
                        causeMessage = causeObj.getString("message");
                        JSONArray extendedStackTraceObjArray = causeObj.getJSONArray("extendedStackTrace");
                        stackTraceArray.addAll(extendedStackTraceObjArray);
                    }
                    for (Object stackTrace : stackTraceArray) {
                        String thrownStackTrance = parserMessageMethod(JSONObject.parseObject(String.valueOf(stackTrace)));
                        stackTranceSb.append(thrownStackTrance).append("\n");
                    }
                }
                logMessage =  logMessage.concat(" ").concat(thrownMessage).concat(causeMessage).concat(stackTranceSb.toString());

            }
        } catch (Exception e) {
            LOGGER.error("Get stack trace :" + ExceptionUtils.getStackTrace(e));
            LOGGER.error("Get stack trace  json:" + jsonObject);
        }
        return logMessage;
    }


    private static String parserMessageMethod(JSONObject extendedStackTraceObj) {
        String aClass = extendedStackTraceObj.getString("class");
        String method = extendedStackTraceObj.getString("method");
        String file   = StringUtils.defaultIfBlank(extendedStackTraceObj.getString("file"), "");
        String line = extendedStackTraceObj.getString("line");
        String causeMessage = "at ".concat(aClass).concat(".").concat(method).concat("(").concat(file.concat(":").concat(line)).concat(")");
        return causeMessage;
    }


}

三、使用 multiMatchAny 高效查询多个关键字

3.1 日志检索的真实需求

大多数日志查询:

  • 多个关键词 OR 或者 And

  • 不需要复杂分词

  • 更像是:字符串包含判断


3.2 multiMatchAny 的作用

multiMatchAny(message, ['error', 'timeout', 'exception'])

等价于:

message REGEXP 'error|timeout|exception'

优势:

  • 一次编译正则

  • 多关键词 OR

  • 比多个 LIKE 更优雅


3.3 必须注意的坑(非常重要)

⚠️ multiMatchAny 使用的是正则,不是普通字符串

如果关键词来自:

  • 用户输入

  • 日志内容

必须做正则转义,否则直接 BAD_ARGUMENTS

正确做法(Java)

Pattern.quote(keyword)

生成:

\Qerror (xxx)\E

👉 保证安全 + 不误匹配


3.4 性能建议

场景 建议
1~2 个关键词 position
3~20 个 multiMatchAny
上百个 拆批 or 预聚合

四、ClickHouse 建表的关键注意事项

4.1 表引擎选择

ENGINE = MergeTree

日志场景不建议:

  • ReplacingMergeTree

  • CollapsingMergeTree

日志是天然 append-only


4.2 分区设计(极其重要)

PARTITION BY toDate(event_time)

原则:

  • 按时间分区

  • 控制单分区大小

经验值:

  • 单分区 < 50GB


4.3 ORDER BY 的选择

推荐:

ORDER BY (service, event_time)

原因:

  • 查询通常先按 service 过滤

  • 再按时间范围

❌ 不要把 message 放进 ORDER BY


4.4 字段类型优化

字段 建议类型
service LowCardinality(String)
level Enum8
message String
trace_id String

👉 LowCardinality 对日志非常友好


4.5 TTL 与冷热数据

TTL event_time + INTERVAL 30 DAY DELETE

或:冷数据迁移到低成本磁盘

日志表建表语句参考:

CREATE TABLE xxx_platform_log_localsql
(
    `threadId` Int64,
    `dateTime` String,
    `loggerFqcn` String,
    `level` String,
    `endOfBatch` String,
    `thread` String,
    `loggerName` String,
    `message` String,
    `threadPriority` String,
    `yarnContainerId` String,
    `instant` String,
    `timestamp` UInt64,
    INDEX idx_level level TYPE ngrambf_v1(4, 512, 2, 0) GRANULARITY 4,
    INDEX idx_message message TYPE ngrambf_v1(4, 512, 2, 0) GRANULARITY 4,
    INDEX idx_yarnContainerId yarnContainerId TYPE ngrambf_v1(4, 512, 2, 0) GRANULARITY 4,
    INDEX idx_timestamp timestamp TYPE minmax GRANULARITY 4
)
ENGINE = MergeTree
PARTITION BY toHour(toDateTime(dateTime))
ORDER BY dateTime
TTL toDateTime(dateTime) + toIntervalDay(15)
SETTINGS index_granularity = 8192

mybatis查询日志语句参考

<select id="searchJobLog" resultType="xxxx.dao.JobLogDo">
        SELECT
            threadId,
            dateTime,
            loggerFqcn,
            level,
            endOfBatch,
            thread,
            loggerName,
            message,
            threadPriority,
            yarnContainerId,
            instant,
            timestamp
        FROM  xxx_log_local
        WHERE
        <if test="containerIds != null and containerIds.size() > 0">
             yarnContainerId IN
            <foreach collection="containerIds" item="containerId" open="(" separator="," close=")">
                #{containerId}
            </foreach>
        </if>
        <choose>
            <when test="cursorTimeStamp != null ">
                AND dateTime > #{cursorTimeStamp}
                <if test="endTimeStamp != null">
                    AND dateTime &lt;=  #{endTimeStamp}
                </if>
            </when>
            <when test="startTimeStamp != null and endTimeStamp != null">
                AND dateTime &gt;=  #{startTimeStamp}
                AND dateTime &lt;=  #{endTimeStamp}
            </when>
            <when test="startTimeStamp != null">
                AND dateTime &gt;= #{startTimeStamp}
            </when>
        </choose>
        <if test="logLevel != null and logLevel != ''">
            AND level = #{logLevel}
        </if>
        <if test="keywords != null and keywords.size() > 0">
            <!-- 关键字为且关系:所有关键字在可搜索字段中均需匹配才命中 -->
            AND length(multiMatchAllIndices(
                concat(
                    coalesce(dateTime, ''), ' ',
                    coalesce(loggerFqcn, ''), ' ',
                    coalesce(level, ''), ' ',
                    coalesce(endOfBatch, ''), ' ',
                    coalesce(thread, ''), ' ',
                    coalesce(loggerName, ''), ' ',
                    coalesce(message, ''), ' ',
                    coalesce(threadPriority, ''), ' ',
                    coalesce(yarnContainerId, ''), ' ',
                    coalesce(instant, ''), ' '
                ),
                <foreach collection="keywords" item="keyword" open="[" separator="," close="]">
                    #{keyword}
                </foreach>
            )) = #{keywordsSize}
        </if>
        <if test="excludeKeywords != null and excludeKeywords.size() > 0">
            <!-- 多字段排除关键字:上述所有字段中均不含任一排除关键字 -->
            AND multiMatchAny(
                concat(
                    coalesce(dateTime, ''), ' ',
                    coalesce(loggerFqcn, ''), ' ',
                    coalesce(level, ''), ' ',
                    coalesce(endOfBatch, ''), ' ',
                    coalesce(thread, ''), ' ',
                    coalesce(loggerName, ''), ' ',
                    coalesce(message, ''), ' ',
                    coalesce(threadPriority, ''), ' ',
                    coalesce(yarnContainerId, ''), ' ',
                    coalesce(instant, ''), ' '
                ),
                <foreach collection="excludeKeywords" item="keyword" open="[" separator="," close="]">
                    #{keyword}
                </foreach>
            ) = 0
        </if>
        ORDER BY timestamp ASC, threadId ASC, yarnContainerId ASC
        LIMIT #{size}

五、总结

在百亿级日志场景下:

  • ES

    • 强在全文搜索

    • 弱在成本与写入

  • ClickHouse

    • 强在高吞吐写入

    • 极致压缩

    • 批量分析与过滤

通过 Flink + ClickHouse + 合理的查询设计, 完全可以构建一个 低成本、高性能、可长期存储 的日志检索系统。


一句话结论

日志不是搜索引擎问题,而是 OLAP 问题

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐