张佳峰HackerNoon头像
@ wydfy111
张家峰
开源爱好者

从@ wydfy111接收故事

姓名@company.com

订阅
统一的分析数据库是数据工程师的圣杯,但它具体是什么样子的呢?它应该随着数据用户的需求而发展。

在纵向上,公司现在拥​​有不断扩大的数据池,并期望在数据处理中实现更高水平的并发。在横向上,他们需要范围更广的数据分析服务。

除了统计报告和临时查询等传统 OLAP 场景外,他们还在推荐系统、风险控制、客户标记和分析以及物联网中利用数据分析。

在所有这些数据服务中,点查询是数据用户进行的最频繁的操作。点查询是指根据Key从数据库中取出一行或几行。

点查询只返回一小部分数据,例如购物订单的详细信息、交易、消费者画像、产品描述、物流状态等。听起来很简单,对吧?

但比较棘手的是,数据库往往需要同时处理数万个点查询,并在毫秒内对所有查询做出响应。

大多数当前的 OLAP 数据库都是使用列式存储引擎构建的,以处理大量数据。他们以高吞吐量为荣,但在高并发场景下往往表现不佳。

作为补充,许多数据工程师邀请 Apache HBase 等 Key-Value 存储进行点查询,并使用 Redis 作为缓存层以减轻负担。缺点是冗余存储和高维护成本。

自Apache Doris诞生以来,我们一直在努力将其打造成一个统一的数据库,用于各种规模的数据查询,包括即席查询和点查询。至此,我们已经拿下了高吞吐OLAP场景的怪物。

在即将发布的 Apache Doris 2.0 中,我们针对高并发点查询进行了优化。长话短说,它可以实现单节点超过 30,000 QPS。

加速高并发查询的五种方法
高并发查询很棘手,因为您需要用有限的系统资源来处理高负载。这意味着您必须尽可能减少单个 SQL 的 CPU、内存和 I/O 开销。

关键是尽量减少对底层数据的扫描和后续计算。

Apache Doris 使用五种方法来获得更高的 QPS。

分区和分桶
Apache Doris 将数据分片为两层结构:Partition 和 Bucket。您可以使用时间信息作为分区键。至于分桶,您在数据散列后将数据分发到各个节点。

明智的分桶计划可以大大提高数据读取的并发性和吞吐量。

这是一个例子:

select * from user_table where id = 5122 and create_date = ‘2022-01-01’

在本例中,用户设置了 10 个桶。create_date是Partition Key,id是Bucket Key。将数据划分分区和桶后,系统只需要扫描一个分区中的一个桶,就可以定位到需要的数据。这是一个巨大的节省时间。

指数
Apache Doris 使用各种数据索引来加速数据读取和过滤,包括智能索引和二级索引。智能索引由 Doris 在数据摄取时自动生成,这不需要用户端的任何操作。

有两种类型的智能索引:

Sorted Index:Apache Doris 以有序的方式存储数据。它为每 1024 行数据创建一个排序索引。索引中的Key为当前1024行中第一行排好序的列的值。

如果查询涉及排好序的列,系统会定位相关的1024行组的第一行,并从那里开始扫描。

ZoneMap Index:这些是 Segment 和 Page 级别的索引。Page内每列的最大值和最小值将被记录,Segment内的值也是如此。因此,在等价查询和范围查询中,系统可以借助 MinMax 索引缩小过滤范围。

二级索引由用户创建。这些包括布隆过滤器索引、位图索引、倒排索引和NGram 布隆过滤器索引。(如果您有兴趣,我将在以后的文章中详细介绍它们。)

例子:

select * from user_table where id > 10 and id < 1024

假设用户id在建表时指定为Key;数据将id在 Memtable 和磁盘上排序。id因此,在排序索引的帮助下,任何涉及过滤条件的查询都将执行得更快。

具体来说,存储中的数据会根据 放入多个范围内id,系统会根据排序后的索引进行二分查找,定位到准确的范围内。

但这仍然可能是一个很大的范围,因为排序的索引是稀疏的。您可以根据 ZoneMap 索引、Bloom Filter 索引和 Bitmap 索引进一步缩小范围。

这是减少数据扫描并提高系统整体并发性的另一种方法。

物化视图
物化视图的思想是以空间换取时间:你用预定义的 SQL 语句执行预计算,并将结果永久保存在一个对用户可见但占用一些存储空间的表中。

通过这种方式,Apache Doris 可以更快地响应聚合数据和细分数据的查询,以及那些涉及排序索引匹配的查询,一旦它命中物化视图。这是减少计算、提高查询性能和减少资源消耗的好方法。

// For an aggregation query, the system reads the pre-aggregated columns in the materialized view.

create materialized view store_amt as select store_id, sum(sale_amt) from sales_records group by store_id;
SELECT store_id, sum(sale_amt) FROM sales_records GROUP BY store_id;

// For a query where k3 matches the sorted column in the materialized view, the system directly performs the query on the materialized view.

CREATE MATERIALIZED VIEW mv_1 as SELECT k3, k2, k1 FROM tableA ORDER BY k3;
select k1, k2, k3 from table A where k3=3;
运行时过滤器
Apache Doris 除了通过索引过滤数据外,还有一个动态过滤机制:Runtime Filter。

在多表Join查询中,左表通常称为ProbeTable,右表称为BuildTable,前者比后者大很多。在查询执行中,首先系统读取正确的表并在内存中创建一个HashTable(Build)。

然后开始逐行读取左表,同时将左表和HashTable进行数据比较,返回匹配的数据(Probe)。

那么 Apache Doris 中有什么新内容呢?在创建 HashTable 的过程中,Apache Doris 会为列生成一个过滤器。它可以是最小/最大过滤器或 IN 过滤器。

然后将filter下推到左表,可以利用filter筛选出数据,从而减少Probe节点需要传输和比较的数据量。

这就是运行时过滤器的工作原理。在大多数 Join 查询中,Runtime Filter 可以自动下推到最底层的扫描节点或分布式 Shuffle Join。换句话说,Runtime Filter 能够减少数据读取并缩短大多数 Join 查询的响应时间。

TOP-N优化
TOP-N查询是数据分析中经常出现的场景。例如,用户想要获取最近的 100 个订单或 5 个最高/最低价格的产品。此类查询的性能决定了实时分析的质量。

Apache Doris 为他们实现了 TOP-N 优化。这是怎么回事:

Apache Doris从Scanner层读取排序后的字段和查询字段,通过Heapsort只保留TOP-N条数据,在持续读取的同时实时更新TOP-N结果,动态下推到Scanner .

结合接收到的 TOP-N 范围和索引,Scanner 可以跳过大部分不相关的文件和数据块,只读取少量行。

平表查询通常意味着需要扫描海量数据,而TOP-N查询只能检索到少量数据。这里的策略是将数据读取过程分为两个阶段。在第一阶段,系统根据几个列(排序列或条件列)对数据进行排序,并找到 TOP-N 行。第二阶段,取出数据排序后的TOP-N行数据,然后根据行号检索目标数据。

综上所述,Apache Doris 对需要读取和排序的数据进行了剪枝,从而大大降低了 I/O、CPU 和内存资源的消耗。

Apache Doris 除了上述五种方法外,还通过 SQL Cache、Partition Cache 以及多种 Join 优化技术来提升并发性。

我们如何将并发提升到一个新的水平
通过以上方式,Apache Doris 可以实现单节点上千 QPS。但是在需要上万QPS的场景下,还是遇到了几个问题的瓶颈:

使用 Doris 的列式存储引擎,读取行很不方便。在平面表模型中,列式存储可能会导致更大的 I/O 使用。

OLAP 数据库的执行引擎和查询优化器有时对于简单的查询(点查询等)来说过于复杂。此类查询需要使用较短的管道进行处理,这应该在查询计划中加以考虑。

Doris 的 FE 模块,实现了 Java,负责对接 SQL 请求,解析查询计划。这些进程在高并发场景中可能会产生高 CPU 开销。

我们优化了 Apache Doris 来解决这些问题。(拉取请求GitHub)

行存储格式
正如我们所知,当用户只查询单行数据时,行存储的效率要高得多。所以我们在 Apache Doris 2.0 中引入了行存储格式。用户可以通过在建表语句中指定以下属性来启用行存储。

“store_row_column” = “true”

我们选择 JSONB 作为行存储的编码格式有以下三个原因:

灵活的模式变化:如果用户添加或删除了一个字段,或者修改了一个字段的类型,这些变化必须在行存储中实时更新。所以我们选择采用JSONB格式,将列编码为JSONB字段。这使得字段的更改非常容易。

高性能:在行存储中访问行比在列存储中访问行要快得多,并且在高并发场景下需要更少的磁盘访问。此外,在某些情况下,您可以将列 ID 映射到相应的 JSONB 值,以便快速访问特定列。

更少的存储空间:JSONB 是一种压缩的二进制格式。它在磁盘上占用的空间更少,并且更具成本效益。

在存储引擎中,行存储将存储为隐藏列(DORIS_ROW_STORE_COL)。在 Memtable Flush 期间,列将被编码为 JSONB 并缓存到这个隐藏列中。

在读取数据时,系统使用Column ID定位列,根据行号找到目标行,然后反序列化相关列。

短路
通常,一条SQL语句的执行分为三步:

SQL Parser 解析语句以生成抽象语法树 (AST)。

查询优化器生成一个可执行计划。

执行计划并返回结果。

对于海量数据的复杂查询,最好遵循查询优化器创建的计划。但是,对于需要低延迟的高并发点查询,该计划不仅没有必要,而且会带来额外的开销。

这就是我们为点查询实施短路计划的原因。

图像

一旦 FE 收到点查询请求,就会产生短路方案。它是一个轻量级的计划,不涉及等效转换、逻辑优化或物理优化。

相反,它对 AST 进行一些基本分析,据此创建一个固定的计划,并找到减少优化器开销的方法。

对于简单的涉及主键的点查询,比如select * from tbl where pk1 = 123 and pk2 = 456,由于只涉及单个Tablet,所以最好使用轻量级的RPC接口与Storage Engine进行交互。

这样就避免了创建复杂的Fragment Plan,消除了MPP查询框架下调度带来的性能开销。

RPC接口的详细信息如下:

message PTabletKeyLookupRequest {
required int64 tablet_id = 1;
repeated KeyTuple key_tuples = 2;
optional Descriptor desc_tbl = 4;
optional ExprList output_expr = 5;
}

message PTabletKeyLookupResponse {
required PStatus status = 1;
optional bytes row_batch = 5;
optional bool empty_batch = 6;
}
rpc tablet_fetch_data(PTabletKeyLookupRequest) returns (PTabletKeyLookupResponse);

tablet_id是根据主键列计算的,而key_tuples是主键的字符串格式。在此示例中,key_tuples类似于 [‘123’, ‘456’]。当 BE 收到请求时,key_tuples会编码成主键存储格式。

然后,它会借助主键索引在Segment File中定位Key对应的行号,并检查该行是否存在于delete bitmap. 如果是,则返回行号;如果没有,系统返回 NotFound。

返回的行号将用于 上的点查询__DORIS_ROW_STORE_COL__。这意味着我们只需要在该列中找到一行,获取 JSONB 格式的原始值,并将其反序列化。

准备好的声明
在高并发查询中,一部分CPU开销来自于FE中的SQL分析解析。为了减少这样的开销,在FE中,我们提供了完全兼容MySQL协议的prepared statements。

使用准备好的语句,我们可以实现主键查询的四倍性能提升。

图像

prepared statements 的思想是将预先计算好的 SQL 和 HashMap 中的表达式缓存在内存中,以便在需要的时候可以直接在查询中使用。

Prepared statements采用MySQL二进制协议进行传输。该协议在 mysql_row_buffer.[h|cpp] 文件中实现,并使用 MySQL 二进制编码。

在该协议下,客户端(如JDBC Client)通过PREPAREMySQL Command向FE发送预编译语句。

接下来FE会对语句进行解析分析,缓存到上图的HashMap中。接下来,客户端使用EXECUTEMySQL Command 将占位符替换,并将其编码为二进制格式,并将其发送给 FE。

然后,FE会进行反序列化,获取占位符的值,生成查询条件。

图像

除了在 FE 中缓存准备好的语句外,我们还在 BE 中缓存可重用的结构。这些结构包括预分配的计算块、查询描述符和输出表达式。

序列化和反序列化这些结构通常会导致 CPU 热点,因此缓存它们更有意义。

每个查询的准备好的语句都带有一个名为 CacheID 的 UUID。所以BE在执行点查询的时候,会根据CacheID找到对应的类,然后在计算中复用这个结构体。

以下示例演示了如何在 JDBC 中使用准备好的语句:

第一步:设置一个JDBC URL,在服务器端启用prepared statement。

url = jdbc:mysql://127.0.0.1:9030/ycsb?useServerPrepStmts=true

第 2 步:使用准备好的语句。

// Use ? as placeholder, reuse readStatement.
PreparedStatement readStatement = conn.prepareStatement(“select * from tbl_point_query where key = ?”);

readStatement.setInt(1234);
ResultSet resultSet = readStatement.executeQuery();

readStatement.setInt(1235);
resultSet = readStatement.executeQuery();

行存储缓存
Apache Doris 有一个页面缓存功能,每个页面缓存一列的数据。

图像

如上所述,我们在 Doris 中引入了行存储。这样做的问题是一行数据由多列组成,因此在大查询的情况下,缓存的数据可能会被删除。因此,我们还引入了行缓存来提高行缓存命中率。

Row Cache 复用了 Apache Doris 中的 LRU Cache 机制。当缓存启动时,系统会初始化一个阈值。如果达到该阈值,旧的缓存行将被淘汰。

对于主键查询语句,缓存命中和缓存未命中之间的性能差距可能很大(我们在这里谈论的是磁盘 I/O 和内存访问减少数十倍)。因此,行缓存的引入可以显着提高点查询性能。

图像

要启用行缓存,您可以在 BE 中指定以下配置:

disable_storage_row_cache=false // This specifies whether to enable row cache; it is set to false by default.
row_cache_mem_limit=20% // This specifies the percentage of row cache in the memory; it is set to 20% by default.
基准性能
我们使用 YCSB(Yahoo! Cloud Serving Benchmark)测试了 Apache Doris,以了解所有这些优化是如何工作的。

配置和数据大小:

机器:单台16核64G云服务器,4×1T硬盘

集群大小:1 个前端 + 2 个后端

数据量:1亿行数据,每行1KB存储;预热

表架构和查询语句:

// Table creation statement:

CREATE TABLE usertable (
YCSB_KEY varchar(255) NULL,
FIELD0 text NULL,
FIELD1 text NULL,
FIELD2 text NULL,
FIELD3 text NULL,
FIELD4 text NULL,
FIELD5 text NULL,
FIELD6 text NULL,
FIELD7 text NULL,
FIELD8 text NULL,
FIELD9 text NULL
) ENGINE=OLAP
UNIQUE KEY(YCSB_KEY)
COMMENT ‘OLAP’
DISTRIBUTED BY HASH(YCSB_KEY) BUCKETS 16
PROPERTIES (
“replication_allocation” = “tag.location.default: 1”,
“in_memory” = “false”,
“persistent” = “false”,
“storage_format” = “V2”,
“enable_unique_key_merge_on_write” = “true”,
“light_schema_change” = “true”,
“store_row_column” = “true”,
“disable_auto_compaction” = “false”
);

// Query statement:

SELECT * from usertable WHERE YCSB_KEY = ?

我们在启用优化(行存储、短路和准备语句)的情况下运行测试,然后在禁用所有优化的情况下再次进行测试。以下是结果:

图像

启用优化后,平均查询时延下降96%,99th百分位时延仅为未优化时的1/28,并实现了30000QPS以上的查询并发。

这是性能上的巨大飞跃,并发量提高了 20 多倍。

最佳实践
需要注意的是,这些针对点查询的优化是在 Apache Doris 的 Unique Key 模型中实现的,您应该为此模型启用 Merge-on-Write 和 Light Schema Change。

这是一个点查询的建表语句示例:

CREATE TABLE usertable (
USER_KEY BIGINT NULL,
FIELD0 text NULL,
FIELD1 text NULL,
FIELD2 text NULL,
FIELD3 text NULL
) ENGINE=OLAP
UNIQUE KEY(USER_KEY)
COMMENT ‘OLAP’
DISTRIBUTED BY HASH(USER_KEY) BUCKETS 16
PROPERTIES (
“enable_unique_key_merge_on_write” = “true”,
“light_schema_change” = “true”,
“store_row_column” = “true”,
);

笔记:

启用light_schema_change以支持编码 ColumnID 的 JSONB 行存储
启用store_row_column存储行存储格式

对于像下面这样的基于主键的点查询,在建表后,可以使用行存储和短路执行来极大地提高性能。

select * from usertable where USER_KEY = xxx;

为了进一步释放性能,您可以应用准备好的语句。如果你有足够的内存空间,你也可以在 BE 配置中启用行缓存。

结论
在高并发场景下,Apache Doris经过行存储、短路、prepared statement、row cache等优化后实现了30000+QPS。

此外,Apache Doris 很容易横向扩展,因为它建立在 MPP 架构之上,您可以通过升级硬件和机器配置来扩展它。

Apache Doris 就是这样实现高吞吐量和高并发的。可让您在一个平台上处理各种数据分析工作负载,体验各种场景的快速数据分析。

在这里插入图片描述

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐