代码示例:

package com.atguigu.common.utils;

import com.atguigu.common.to.MemberPrice;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Author mischen
 * @Description java实现批量插入数据
 * @Date 2023/11/15 16:30
 * @Version 1.0
 */
public class BatchInsertDemo {

    public static void main(String[] args) {

    }

    public void addCouponCode(List<MemberPrice> lists){
        if (CollectionUtils.isEmpty(lists)) {
            return;
        }
        int numPerTimes = 500;
        if (lists.size() <= numPerTimes) {
            batchAdd(lists); //直接插入少于500条lists
        } else {
            //获取总数量
            int maxIndex = lists.size();
            //计算页数
            int maxTimes = maxIndex / numPerTimes;
            maxTimes += (maxIndex % numPerTimes) > 0 ? 1 : 0;
            int currentTimes = 0;
            while (currentTimes < maxTimes) {
                int fromIndex = numPerTimes * currentTimes;
                int toIndex = fromIndex + numPerTimes;
                toIndex = toIndex > maxIndex ? maxIndex : toIndex;
                List<MemberPrice> subList = lists.subList(fromIndex, toIndex);
                //这里是循环插入500条lists
                batchAdd(subList); //这里是去真正操作数据库的逻辑
                currentTimes++;
            }
        }
    }
    mybatis插入,可根据业务自行修改
    public void batchAdd(List<MemberPrice> lists) {
        if (CollectionUtils.isEmpty(lists)){

        }
    }

    //利用多线程批量插入数据
    public void insert() {
        // 通过一系列操作获取到要插入的集合,在此使用list代替
        List<MemberPrice> list = new ArrayList<>();

        // 获取虚拟机可用的最大处理器数量
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        // 获取要添加的数据集合大小
        int total = list.size();
        // 每次插入的数量
        int batchSize = 1000;
        // 计算需要分多少批插入数据库(向上取整)
        int totalBatch = (total + batchSize - 1) / batchSize;
        // 手动创建线程池
        ExecutorService executor = new ThreadPoolExecutor(
                // 线程池核心线程数量
                availableProcessors,
                // 线程池最大数量
                availableProcessors + 1000,
                // 空闲线程存活时间
                1000,
                // 时间单位
                TimeUnit.MILLISECONDS,
                // 线程池所使用的缓冲队列
                new ArrayBlockingQueue<>(100),
                // 线程池对拒绝任务的处理策略
                //ThreadPoolExecutor.CallerRunsPolicy 是一种可伸缩队列策略。这种策略是当线程池中的线程数达到最大值,
                //并且已提交的任务数大于线程池中的线程数时,任务将在调用者线程中执行,而不是在线程池中的线程中执行。
                //这个策略可以控制任务的执行速度,防止线程池中的任务过多导致的资源耗尽。
                //————————————————
                new ThreadPoolExecutor.CallerRunsPolicy());

        // 将筛选出的结果分批次添加到表中
        for (int batchIndex = 0; batchIndex < totalBatch; batchIndex++) {
            // 当前插入批次的起始索引
            int startIndex = batchIndex * batchSize;
            // 当前插入批次的结束索引
            int endIndex = Math.min((batchIndex + 1) * batchSize, total);
            // 截取本次要添加的数据
            List<MemberPrice> insertList = list.subList(startIndex, endIndex);
            // 将每个批次的插入逻辑封装成一个Runnable对象
            Runnable task = () -> {
                // 添加本批次数据到数据库中
                userMapper.batchInsert(insertList);
            };
            // 提交添加任务
            executor.submit(task);
        }
        // 关闭线程池释放资源
        executor.shutdown();
    }

}

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐