如何基于Flink + Clickhouse打造百亿规模日志查询平台
摘要:本文探讨了日志检索系统从Elasticsearch迁移到ClickHouse的技术方案。Elasticsearch虽具备全文检索优势,但其分词特性与日志场景所需的精准模糊匹配存在本质冲突。相比之下,ClickHouse基于原始字符串匹配的朴素模型更符合日志查询需求。文章详细介绍了基于Flink的高吞吐日志写入架构,强调批量写入对ClickHouse性能的关键影响,并提供了multiMatch
一、背景:为什么弃用 Elasticsearch?选择 ClickHouse?
在早期日志检索系统中,Elasticsearch(ES)几乎是事实标准:
-
倒排索引,全文检索能力强
-
生态成熟,Kibana 使用友好
-
对研发同学上手成本低
但是:日志查询需要“精准模糊匹配”,而不是分词搜索
在日志场景中,一个经常被忽略但非常关键的事实是:
日志查询 ≠ 自然语言搜索
绝大多数日志检索关注的是:
-
traceId / requestId
-
error code
-
固定格式的异常片段
-
精确字符串上下文
例如:
#0 (cfeebeea05bd01966dc6320727f2fa7
java.lang.IllegalStateException
Timeout while waiting for xxx
这些内容的共同特点是:
-
强顺序性
-
不可分词
-
分词后反而会破坏语义
ES 在日志场景下的“隐性问题”
为了“不分词”,ES 在日志场景下需要付出的额外代价
在真实生产环境中,为了让 ES 尽量接近“原始字符串匹配”,通常需要做大量额外工作:
必须使用 keyword 字段
-
text字段会被 analyzer 分词,无法用于精准日志检索 -
需要在 mapping 中同时维护:
{
"message": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 32766
}
}
}
}
mapping 复杂度显著提升。
对比之下,ClickHouse 更“朴素”:
-
不分词
-
不猜测语义
-
不依赖 mapping 技巧
只需要:
position(message, 'IllegalStateException') > 0
或者:
multiMatchAny(message, ['timeout', 'connection refused'])
即可完成日志工程中最常见、也是最可靠的查询需求。
在日志系统中,复杂并不等于高级,可预期才是核心竞争
这种模模式:
-
更符合日志系统查询需求
-
查询结果可预期
-
不存在 analyzer 差异
对日志系统来说: “精准的模糊匹配” 比 “智能分词” 更重要
| 维度 | ES | ClickHouse |
| 数据模型 | 文档 | 列式 |
| 存储成本 | 高 | 低 3~5 倍 |
| 写入吞吐 | 一般 | 极高(百万/s) |
| 批量分析 | 一般 | 极强 |
| 运维复杂度 | 高 | 相对简单 |
对日志系统而言:先过滤,再字符串匹配,ClickHouse 更合适
二、基于 Flink 的高吞吐日志写入方案
2.1 总体架构
日志源 → Kafka → Flink → ClickHouse
-
Kafka:削峰、缓冲
-
Flink:
-
清洗
-
结构化
-
批量写入
-
-
ClickHouse:长期存储 + 查询
2.2 为什么必须“批量写入”?
ClickHouse 的核心特性:
-
Append Only
-
写入单位是 Block(而不是单行)
❌ 单条 INSERT:
-
QPS 高
-
IO 放大严重
✅ 批量 INSERT:
-
吞吐提升 10~100 倍
-
Merge 压力显著降低
2.3 Flink 写入 ClickHouse 的关键点
1️⃣ Sink 侧缓存
-
按 条数 / 时间 flush
-
常见配置:
-
5k~20k 行
-
或 1~3 秒
-
2️⃣ 异步 + 批量
-
使用
PreparedStatement.addBatch() -
禁止逐条 execute
3️⃣ 严格控制并发
-
并发过高会导致:
-
Too many parts
-
Merge backlog
-
实战经验:并发 < CPU 核数
这里只提供自己写的clickhouse_sink供大家参考,整体flink代码参考flink官网
package com.techwolf.datastar.sink;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class LogClickhouseSink extends RichSinkFunction<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(LogClickhouseSink.class);
private List<JSONObject> logList;
private Connection LogConnection;
private JSONObject logObj;
private String INSERT_TCP_SQL = "insert into xx_job_log_local(threadId, " +
"dateTime, loggerFqcn, level, endOfBatch, thread, message, " +
"threadPriority,yarnContainerId, instant, timestamp, loggerName ) " +
"values(?,?,?,?,?,?,?,?,?,?,?,?) " ;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
logList = new ArrayList<>();
LogConnection = getLogConnection();
flinkLogConnection.setAutoCommit(true);
}
@Override
public void invoke(String value, Context context) throws Exception {
super.invoke(value, context);
try {
logObj = JSONObject.parseObject(value);
logObj.put("message", getStackTraceMessage(logObj));
logList.add(logObj);
if (logList.size() < 1000) {
return;
}
PreparedStatement preparedStatement = flinkLogConnection.prepareStatement(INSERT_TCP_SQL);
for (JSONObject event : logList) {
preparedStatement.setLong(1, event.getInteger("threadId"));
preparedStatement.setString(2, event.getString("dateTime"));
preparedStatement.setString(3, event.getString("loggerFqcn"));
preparedStatement.setString(4, event.getString("level"));
preparedStatement.setString(5, event.getString("endOfBatch"));
preparedStatement.setString(6, event.getString("thread"));
preparedStatement.setString(7, event.getString("message"));
preparedStatement.setString(8, event.getString("threadPriority"));
preparedStatement.setString(9, event.getString("yarnContainerId"));
preparedStatement.setString(10, event.getString("instant"));
preparedStatement.setLong(11, event.getLong("timestamp"));
preparedStatement.setString(12, event.getString("loggerName"));
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
preparedStatement.clearBatch();
flinkLogConnection.commit();
//清空日志list
logList.clear();
preparedStatement.close();
} catch (Exception e) {
LOGGER.error("parser json log event fail :" + e);
LOGGER.error("parser value :" + value);
}
}
/**
* 初始化日志栈
* @param jsonObject
* @return
*/
private static String getStackTraceMessage(JSONObject jsonObject) {
String logMessage = jsonObject.getString("message");
try {
StringBuilder stackTranceSb = new StringBuilder();
JSONObject logThrownObj = jsonObject.getJSONObject("thrown");
if (Objects.nonNull(logThrownObj)) {
String thrownMessage = StringUtils.defaultString(logThrownObj.getString("message"), "");
JSONArray stackTraceArray = logThrownObj.getJSONArray("extendedStackTrace");
String causeMessage = StringUtils.EMPTY;
if (Objects.nonNull(stackTraceArray)) {
JSONObject causeObj = logThrownObj.getJSONObject("cause");
if (Objects.nonNull(causeObj)) {
causeMessage = causeObj.getString("message");
JSONArray extendedStackTraceObjArray = causeObj.getJSONArray("extendedStackTrace");
stackTraceArray.addAll(extendedStackTraceObjArray);
}
for (Object stackTrace : stackTraceArray) {
String thrownStackTrance = parserMessageMethod(JSONObject.parseObject(String.valueOf(stackTrace)));
stackTranceSb.append(thrownStackTrance).append("\n");
}
}
logMessage = logMessage.concat(" ").concat(thrownMessage).concat(causeMessage).concat(stackTranceSb.toString());
}
} catch (Exception e) {
LOGGER.error("Get stack trace :" + ExceptionUtils.getStackTrace(e));
LOGGER.error("Get stack trace json:" + jsonObject);
}
return logMessage;
}
private static String parserMessageMethod(JSONObject extendedStackTraceObj) {
String aClass = extendedStackTraceObj.getString("class");
String method = extendedStackTraceObj.getString("method");
String file = StringUtils.defaultIfBlank(extendedStackTraceObj.getString("file"), "");
String line = extendedStackTraceObj.getString("line");
String causeMessage = "at ".concat(aClass).concat(".").concat(method).concat("(").concat(file.concat(":").concat(line)).concat(")");
return causeMessage;
}
}
三、使用 multiMatchAny 高效查询多个关键字
3.1 日志检索的真实需求
大多数日志查询:
-
多个关键词 OR 或者 And
-
不需要复杂分词
-
更像是:字符串包含判断
3.2 multiMatchAny 的作用
multiMatchAny(message, ['error', 'timeout', 'exception'])
等价于:
message REGEXP 'error|timeout|exception'
优势:
-
一次编译正则
-
多关键词 OR
-
比多个
LIKE更优雅
3.3 必须注意的坑(非常重要)
⚠️ multiMatchAny 使用的是正则,不是普通字符串
如果关键词来自:
-
用户输入
-
日志内容
必须做正则转义,否则直接 BAD_ARGUMENTS
正确做法(Java)
Pattern.quote(keyword)
生成:
\Qerror (xxx)\E
👉 保证安全 + 不误匹配
3.4 性能建议
| 场景 | 建议 |
| 1~2 个关键词 | position |
| 3~20 个 | multiMatchAny |
| 上百个 | 拆批 or 预聚合 |
四、ClickHouse 建表的关键注意事项
4.1 表引擎选择
ENGINE = MergeTree
日志场景不建议:
-
ReplacingMergeTree
-
CollapsingMergeTree
日志是天然 append-only
4.2 分区设计(极其重要)
PARTITION BY toDate(event_time)
原则:
-
按时间分区
-
控制单分区大小
经验值:
-
单分区 < 50GB
4.3 ORDER BY 的选择
推荐:
ORDER BY (service, event_time)
原因:
-
查询通常先按 service 过滤
-
再按时间范围
❌ 不要把 message 放进 ORDER BY
4.4 字段类型优化
| 字段 | 建议类型 |
| service | LowCardinality(String) |
| level | Enum8 |
| message | String |
| trace_id | String |
👉 LowCardinality 对日志非常友好
4.5 TTL 与冷热数据
TTL event_time + INTERVAL 30 DAY DELETE
或:冷数据迁移到低成本磁盘
日志表建表语句参考:
CREATE TABLE xxx_platform_log_localsql
(
`threadId` Int64,
`dateTime` String,
`loggerFqcn` String,
`level` String,
`endOfBatch` String,
`thread` String,
`loggerName` String,
`message` String,
`threadPriority` String,
`yarnContainerId` String,
`instant` String,
`timestamp` UInt64,
INDEX idx_level level TYPE ngrambf_v1(4, 512, 2, 0) GRANULARITY 4,
INDEX idx_message message TYPE ngrambf_v1(4, 512, 2, 0) GRANULARITY 4,
INDEX idx_yarnContainerId yarnContainerId TYPE ngrambf_v1(4, 512, 2, 0) GRANULARITY 4,
INDEX idx_timestamp timestamp TYPE minmax GRANULARITY 4
)
ENGINE = MergeTree
PARTITION BY toHour(toDateTime(dateTime))
ORDER BY dateTime
TTL toDateTime(dateTime) + toIntervalDay(15)
SETTINGS index_granularity = 8192
mybatis查询日志语句参考
<select id="searchJobLog" resultType="xxxx.dao.JobLogDo">
SELECT
threadId,
dateTime,
loggerFqcn,
level,
endOfBatch,
thread,
loggerName,
message,
threadPriority,
yarnContainerId,
instant,
timestamp
FROM xxx_log_local
WHERE
<if test="containerIds != null and containerIds.size() > 0">
yarnContainerId IN
<foreach collection="containerIds" item="containerId" open="(" separator="," close=")">
#{containerId}
</foreach>
</if>
<choose>
<when test="cursorTimeStamp != null ">
AND dateTime > #{cursorTimeStamp}
<if test="endTimeStamp != null">
AND dateTime <= #{endTimeStamp}
</if>
</when>
<when test="startTimeStamp != null and endTimeStamp != null">
AND dateTime >= #{startTimeStamp}
AND dateTime <= #{endTimeStamp}
</when>
<when test="startTimeStamp != null">
AND dateTime >= #{startTimeStamp}
</when>
</choose>
<if test="logLevel != null and logLevel != ''">
AND level = #{logLevel}
</if>
<if test="keywords != null and keywords.size() > 0">
<!-- 关键字为且关系:所有关键字在可搜索字段中均需匹配才命中 -->
AND length(multiMatchAllIndices(
concat(
coalesce(dateTime, ''), ' ',
coalesce(loggerFqcn, ''), ' ',
coalesce(level, ''), ' ',
coalesce(endOfBatch, ''), ' ',
coalesce(thread, ''), ' ',
coalesce(loggerName, ''), ' ',
coalesce(message, ''), ' ',
coalesce(threadPriority, ''), ' ',
coalesce(yarnContainerId, ''), ' ',
coalesce(instant, ''), ' '
),
<foreach collection="keywords" item="keyword" open="[" separator="," close="]">
#{keyword}
</foreach>
)) = #{keywordsSize}
</if>
<if test="excludeKeywords != null and excludeKeywords.size() > 0">
<!-- 多字段排除关键字:上述所有字段中均不含任一排除关键字 -->
AND multiMatchAny(
concat(
coalesce(dateTime, ''), ' ',
coalesce(loggerFqcn, ''), ' ',
coalesce(level, ''), ' ',
coalesce(endOfBatch, ''), ' ',
coalesce(thread, ''), ' ',
coalesce(loggerName, ''), ' ',
coalesce(message, ''), ' ',
coalesce(threadPriority, ''), ' ',
coalesce(yarnContainerId, ''), ' ',
coalesce(instant, ''), ' '
),
<foreach collection="excludeKeywords" item="keyword" open="[" separator="," close="]">
#{keyword}
</foreach>
) = 0
</if>
ORDER BY timestamp ASC, threadId ASC, yarnContainerId ASC
LIMIT #{size}
五、总结
在百亿级日志场景下:
-
ES:
-
强在全文搜索
-
弱在成本与写入
-
-
ClickHouse:
-
强在高吞吐写入
-
极致压缩
-
批量分析与过滤
-
通过 Flink + ClickHouse + 合理的查询设计, 完全可以构建一个 低成本、高性能、可长期存储 的日志检索系统。
一句话结论:
日志不是搜索引擎问题,而是 OLAP 问题
更多推荐
所有评论(0)