springboot 实现elastic search 千万级数据同步 ,mysql 深度分页处理
(1)更新数据量太大,如果没有使用 消息队列则会对服务器造成相当大的压力(2)即使用消息队列,产生的消息也相当多,要注意消息积压的问题。
当es需要加入或者更新几个字段,按业务需求可能要同步近一年,或者所有的mysql数据到es中,该怎么实现?
1.如果本来项目中es的插入是配置了第三方监听mysql binlog实现的。那么可以写一条sql批量更新
问题:
(1)更新数据量太大,如果没有使用 消息队列则会对服务器造成相当大的压力
(2)即使用消息队列,产生的消息也相当多,要注意消息积压的问题
2.写一个方法,读取mysql中的数据然后处理再更新es
问题:
(1)千万条数据怎么读mysql比较好?
(2)es一次处理多少条?
为了避免内存占用过大,我们一次最好不要查询太多的数据,那么是否使用分页呢?
比如1000万的数据,一页10000条,则mysql要查询至少1000次,而分页查询存在深度分页问题,即偏移量越大,查询需要的时间越长,后面甚至要几十秒甚至到几分钟
解决方案:
1.1.不使用分页,直接分割数据
比如要查一年的数据,切割时间对象,在知道开始时间和结束时间的情况下按天数切割,根据数据量大小,以及单个数据的长度评估吗,平均一下一次查询比如查
差不多10w条,那么就按三天切割
public static List<TimeObject> cutTime(String beginTime, String endTime, long hours) {
List<TimeObject> timeObjects = new ArrayList();
Date start = parse(beginTime, "yyyy-MM-dd HH:mm:ss");
Date end = parse(endTime, "yyyy-MM-dd HH:mm:ss");
Date newEndTime;
for(long cutNum = hours * 3600L * 1000L; start.getTime() + cutNum < end.getTime(); start = newEndTime) {
newEndTime = new Date(start.getTime() + cutNum);
timeObjects.add(new TimeObject(toDateString(start, "yyyy-MM-dd HH:mm:ss"), toDateString(newEndTime, "yyyy-MM-dd HH:mm:ss")));
}
timeObjects.add(new TimeObject(toDateString(start, "yyyy-MM-dd HH:mm:ss"), endTime));
return timeObjects;
}
public static String toDateString(Date date, String pattern) {
return (new DateTime(date)).toString(pattern);
}
1.2 切割id
一般id都是递增,那么知道起始id和结束id的情况下可以切割id,切割id会比切割时间更均匀,一次10w也可以
1.3 使用游标
按id排序,每次查询的时候获取最后一条,作为下次查询的条件 (id>lastId),直接limit条数即可,字段如果不多且不大,一次查10w条也可以
<select id="selectRecordsById" parameterType="Long" resultType="YourResultType">
SELECT field1, field2, field3
FROM your_table
WHERE id > #{lastId}
AND created between #{beginTime} and #{endTime}
ORDER BY id
LIMIT #{limit}
</select>
Long lastId = 0L;
int pageSize = 100000;
boolean hasMoreData = true;
while (hasMoreData) {
List<YourResultType> results = yourMapper.selectRecordsById(lastId, pageSize);
if (results.isEmpty()) {
hasMoreData = false;
} else {
// 在此执行 Elasticsearch 索引操作
}
// 更新 lastId
lastId = results.get(results.size() - 1).getId(); // 替换为用于排序的字段
}
}
1.4 扩展,当正常业务中,无法直接得到偏移量的前一个id的时候,我们可以使用覆盖索引查询这次查询的前一个最大的id:
SELECT
id
FROM sku_gross_margin
WHERE
operateTime
between
'2023-01-01 00:00:00' and '2024-07-01 09:15:08'
ORDER BY id
LIMIT #{offset-1},1
查询条件operateTime需要有索引
然后再使用1.3,就可以根据页码进行深度分页查询了
Elastic Search client的BulkRequest 更新
es的bulk请求一般是在1000到10000个request,我们可以一次放5000个request
2.1 同步方法:
在1的单个循环中,获取到结果集合,我们对结果集合进行一个for循环,每一个都创建request放进buikRequest,当满5000时,执行一次bulk操作,然后把bulkRequest重置(注意循环结束可能还有剩余的request,如果有则还要请求一次)
//buikRequest
BulkRequest request = new BulkRequest();
for (SkuGrossMarginVO skuGrossMarginVO : voList) {
UpdateRequest updateRequest = new UpdateRequest(//构造单个);
updateRequest.doc(updateMap);
request.add(updateRequest);
//es一次处理5k条,如果达到5k则先处理
if (request.numberOfActions()>=5000){
restHighLevelClientService.buik(request);
//重置请求对象
request = new BulkRequest();
}
}
//处理循环结束剩余的
if (request.numberOfActions()>0){
restHighLevelClientService.buik(request);
}
2.2异步方法:
简单版:把2.1的内容提取成一个方法,使用spring提供的@Async注解来实现异步
(要注意方法不能是private,且不能在同一个类中)
或者使用ExecutorService 执行submit
更多推荐
所有评论(0)