前言

在企业经常使用到数据导出excel表格,但是在上千上百万条数据量比较多的情况下还使用传统的页面点击导出然后等待下载,数据量比较多导出时间比较长等待时间久,还有可能出现数据大造成数据库压力导出崩溃情况。针对这一情况也有很多的解决方法,为了解决这个情况问题,下面我们通过线程池来实现异步导出excel表格数据。

一、使用线程池好处

线程池(ThreadPoolExecutor)是用于管理线程执行的一个重要工具,它通过重用一组线程来执行任务,从而减少线程的创建与销毁带来的开销,提高系统性能。

  • 1、降低资源的消耗:线程可以重复使用,不需要在创建线程和消耗线程上浪费资源;
  • 2、提高响应速度:任务到达时,线程可以复用已有的线程,及时响应;
  • 3、可管理性:无限制的创建线程会降低系统效率,线程池可以对线程进行管理、监控、调优。

二、线程池详细参数说明

参数名称

描述

类型

默认值

备注

corePoolSize

线程池的核心线程数。线程池始终保持的最小线程数,即使线程处于空闲状态,也不会被销毁。

int

0

设定核心线程数。核心线程空闲时如果超过指定时间,则会被回收,除非 allowCoreThreadTimeOut 设置为 true。

maximumPoolSize

线程池的最大线程数。线程池最多能创建的线程数。如果线程池中正在执行的线程数达到核心线程数时,额外任务会在队列中等待,若队列已满,则创建新线程,直到最大线程数。

int

Integer.MAX_VALUE

maximumPoolSize
是线程池能够允许的最大线程数,超出该数目将采用拒绝策略。

keepAliveTime

线程空闲的最大时间。当线程池中的线程数量超过 corePoolSize
时,空闲线程超过 keepAliveTime
后会被销毁。

long

60L

此参数作用于线程池中大于 corePoolSize 的线程数。

unit

keepAliveTime 的时间单位。用于指定空闲时间的时间单位。

TimeUnit

TimeUnit.SECONDS

常见的时间单位有:TimeUnit.SECONDS、TimeUnit.MILLISECONDS、TimeUnit.MINUTES 等。

workQueue

存放任务的阻塞队列。队列用于存储等待执行的任务。若线程池的线程数达到 corePoolSize,新任务会被添加到队列中等待处理。

BlockingQueue

LinkedBlockingQueue

常见队列类型包括:LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue 等。队列决定任务的缓存策略。

threadFactory

用于创建新线程的工厂。可以通过 Executors.defaultThreadFactory()
获取一个默认的线程工厂,也可以自定义线程工厂来创建线程。

ThreadFactory

Executors.defaultThreadFactory()

默认的线程工厂会创建一个新线程,并为线程命名(如:pool-1-thread-1)。

handler

拒绝策略。用于处理任务队列已满且线程池中的线程数达到最大值时的任务。可以通过自定义策略来控制任务丢弃、抛出异常或由调用者执行等行为。

RejectedExecutionHandler

ThreadPoolExecutor.AbortPolicy

常见拒绝策略包括:AbortPolicy
(抛异常),CallerRunsPolicy
(由调用者线程执行),DiscardPolicy(直接丢弃任务),DiscardOldestPolicy(丢弃最旧的任务)。

参数详细解析
  1. corePoolSize

    • 设定线程池的核心线程数。即使当前没有任务可执行,核心线程数的线程也会保持活动状态。
    • 线程池创建时会启动 corePoolSize 数量的线程。
  2. maximumPoolSize

    • 设定线程池的最大线程数。线程池在需要时会创建新的线程,最多创建到 maximumPoolSize。
    • 当任务队列满且线程数达到 corePoolSize 时,会尝试创建新线程,直到达到最大线程数。超过这个数目就会执行拒绝策略。
  3. keepAliveTime

    • 设定线程空闲时的最大存活时间。对于非核心线程(线程池中的线程数大于 corePoolSize 时创建的线程),如果超过 keepAliveTime 还没有任务,它们会被销毁。
    • 这个时间参数只有在线程池中线程数量大于 corePoolSize 时才会起作用。
  4. unit

    • `keepAliveTime 的单位。单位可以是秒、毫秒、分钟等,通过 TimeUnit 枚举类来表示。
    • 常见的单位有 TimeUnit.SECONDS、TimeUnit.MILLISECONDS 和 TimeUnit.MINUTES。
  5. workQueue

    • 任务队列,用于存储等待执行的任务。
    • 如果当前的线程池没有足够的线程来执行任务,任务会被添加到队列中等待。如果队列已满,线程池会创建新的线程,直到达到 maximumPoolSize。
  6. threadFactory

    • 线程工厂,用于定制线程的创建。可以用来设置线程的优先级、名称等。
    • 默认工厂通过 Executors.defaultThreadFactory() 创建一个新的线程并赋予一个默认的名称,如 pool-1-thread-1。
  7. handler

    • 拒绝策略,处理任务队列已满、线程池已达到最大线程数时的情况。
    • 常见的拒绝策略有:
      • `AbortPolicy(默认):抛出 RejectedExecutionException 异常。
      • `CallerRunsPolicy:由调用者线程执行该任务。
      • `DiscardPolicy:丢弃当前任务。
      • `DiscardOldestPolicy:丢弃队列中最旧的任务,并尝试执行当前任务。

    import java.util.concurrent.*;

    public class ThreadPoolExecutorExample {
    public static void main(String[] args) {
    int corePoolSize = 2;
    int maximumPoolSize = 4;
    long keepAliveTime = 60L;
    TimeUnit unit = TimeUnit.SECONDS;
    BlockingQueue workQueue = new LinkedBlockingQueue<>(10);

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue
        );
    
        // 创建并提交任务
        for (int i = 0; i < 20; i++) {
            executor.submit(new Task(i));
        }
    
        // 优雅关闭线程池
        executor.shutdown();
    }
    
    static class Task implements Runnable {
        private final int taskId;
    
        public Task(int taskId) {
            this.taskId = taskId;
        }
    
        @Override
        public void run() {
            System.out.println("Executing task " + taskId + " by " + Thread.currentThread().getName());
        }
    }
    

    }

三、使用单线程异步导出excel代码分析

1、异步导出公共调用方法

  • 1.这个方法是通用的对外方法(synExportCommom),sourceType是知道导出数据的模板类型,queryDTO是接口的请求参数。

  • 2.uploadObsAttachmentSyncService这个Service层主要存储导出的excel文件上传到远程服务器的路径。

  • executor这个线程是父级线程也是主线程。

    private final static ExecutorService executor = Executors.newFixedThreadPool(4);

    @Override
    public void synExportCommom(HttpServletResponse response, String sourceType, QeryDTO queryDTO) {
    String employeeNum = “0110210”
    long count = uploadObsAttachmentSyncService.countHandling(employeeNum, sourceType);
    // 先校验是否有任务正则执行中
    if (count > 0) {
    throw new BusinessException(“有任务正在执行中,请稍后再试!”);
    }
    String attachName = CtmContractAttachType.getName(sourceType);
    log.info(“开始导出” + attachName + “!,下载人员:” + employeeNum);
    CtmUploadObsAttachmentSyncDTO obsAttachmentSyncDTO = uploadObsAttachmentSyncService.createObsAttachmentSyncDTO(employeeNum, sourceType, attachName);
    long uploadAttachmentId = uploadObsAttachmentSyncService.saveUploadAttachment(obsAttachmentSyncDTO);
    executor.execute(() -> {
    try {
    syncStartExportContractInfo(response, queryDTO, uploadAttachmentId, sourceType);
    } catch (Exception e) {
    log.error(“导出信息” + attachName + “附件失败”, e);
    }
    });
    log.info(“导出信息” + attachName + “全部完成同步!,下载人员:” + employeeNum);
    }

2、处理导出数据子线程方法

  • 1、首先创建一个临时文件,用于存储处理完的excel数据:****

    File tempFile= File.createTempFile(obsAttachmentSync.getAttachName(), “.csv”);
    OutputStream outputStream = Files.newOutputStream(tempFile.getAbsoluteFile().toPath());

  • 2、创建一个easyexcel

    OutputStream outputStream = Files.newOutputStream(tempFile.getAbsoluteFile().toPath());
    ExcelUtil.resetResponseCSV(“信息列表”, response);
    ExcelWriter excelWriter = EasyExcel.write(outputStream).autoCloseStream(true).registerConverter(new ExcelBigNumberConvert()).build();

  • 3、创建一个子线程池,用于处理数据

    // 创建一个线程池
    ExecutorService executor = Executors.newFixedThreadPool(1);

    // 存储任务的 Future 对象
    List<Future<?>> futures = new ArrayList<>(); Future<?> future = executor.submit(() -> {
    exportContractInfo(excelWriter, queryDTO, pageSize, sourceType);
    });
    try {
    future.get(); // 获取任务结果,如果有异常会在这里抛出
    } catch (Exception e) {
    log.error(“导出信息时发生异常2”, e);
    throw new BusinessException(“导出信息时发生异常1”);
    }

  • 4、上传文件到远程OBS服务器,通过附件信息记录表中的状态:处理中、处理完成、处理失败

    • 当子线程执行完成就会把附件记录表中的状态改为处理完成。
    • 状态为处理完成后通过前端获取返回的附件链接进行下载。
  • 关键代码全部流程

    /**

    • 使用流式处理(分块读取数据)导出个性化价格信息

    • @param response HTTP响应对象,用于导出文件

    • @param queryDTO 查询条件对象,用于指定导出的数据范围

    • @param uploadAttachmentId 上传附件的ID,用于关联导出的文件

    • @param sourceType 导出数据的来源类型,用于区分数据源

    • @param ctmContractAuth 合同权限对象,用于控制导出数据的权限范围

    • @throws IOException 当文件操作发生错误时抛出此异常
      */
      private void syncStartExportContractInfo(HttpServletResponse response, QeryDTO queryDTO, long uploadAttachmentId,
      String sourceType) throws IOException {
      // 定义每次读取的数据量
      int pageSize = 10000;
      File tempFile = null;
      CtmUploadObsAttachmentSync obsAttachmentSync = null;
      try {
      // 获取上传附件的详细信息
      obsAttachmentSync = uploadObsAttachmentSyncService.getById(uploadAttachmentId);
      // 创建一个临时文件用于存储导出的数据
      tempFile = File.createTempFile(obsAttachmentSync.getAttachName(), “.csv”);
      // 处理查询条件
      exportQueryCondition(queryDTO);
      // 创建一个easyexcel
      OutputStream outputStream = Files.newOutputStream(tempFile.getAbsoluteFile().toPath());
      ExcelUtil.resetResponseCSV(“信息列表”, response);
      ExcelWriter excelWriter = EasyExcel.write(outputStream).autoCloseStream(true).registerConverter(new ExcelBigNumberConvert()).build();

       // 创建一个线程池
       ExecutorService executor = Executors.newFixedThreadPool(1);
      
       // 存储任务的 Future 对象
       List<Future<?>> futures = new ArrayList<>();
       // 提交导出数据的任务到线程池
       Future<?> future = executor.submit(() -> {
           exportCommonInfo(excelWriter, queryDTO, pageSize, sourceType);
       });
        // 获取任务结果,如果有异常会在这里抛出
       try {
           future.get();  // 获取任务结果,如果有异常会在这里抛出
       } catch (Exception e) {
           log.error("导出信息时发生异常2", e);
           throw new BusinessException("导出信息时发生异常1");
       }
      
       // 等待所有任务完成
       executor.shutdown();
       executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
       // 完成写入
       excelWriter.finish();
       // 上传文件到OBS
       MultipartFile multipartFile = new FileToMultipartFile(tempFile);
       Map<String, String> fileUpload = huaWeiYunOBSService.fileUpload(multipartFile, "xxx/xxx/xxx/");
       log.info("OBS上传返回链接:" + JSON.toJSONString(fileUpload));
       // 更新附件的OBS相关信息
       obsAttachmentSync.setObsFileName(fileUpload.get("obsFileName"));
       obsAttachmentSync.setObsUrl(fileUpload.get("obsUrl"));
       obsAttachmentSync.setExecuteStatus(CtmContractAttachExecuteStatus.SUCCESS.getCode());
       obsAttachmentSync.setErrorMessage(CtmContractAttachExecuteStatus.SUCCESS.getName());
        // 计算文件大小
       long length = tempFile.length();// 文件大小(B)
       obsAttachmentSync.setObsFileSize(String.valueOf(length));
       obsAttachmentSync.setCompleteTime(new Date());
      

      } catch (Exception e) {
      obsAttachmentSync.setExecuteStatus(CtmContractAttachExecuteStatus.FAILURE.getCode());
      obsAttachmentSync.setErrorMessage(CtmContractAttachExecuteStatus.FAILURE.getName());
      log.error(“导出信息时发生异常3”, e);
      throw new BusinessException(“导出信息时发生异常3”);
      } finally {
      obsAttachmentSync.setUpdateTime(new Date());
      // 更新附件信息
      uploadObsAttachmentSyncService.updateById(obsAttachmentSync);
      // 删除临时文件
      Files.delete(tempFile.getAbsoluteFile().toPath());
      }
      }

3、处理数据表格的公共方法

导出信息到Excel
  • 获取附件名称:通过sourceType获取对应的附件名称。

  • 创建Excel工作表:根据sourceType创建不同类型的Excel工作表。

  • 导出数据:调用exportData方法,分页查询并写入数据到Excel中。

  • 复杂逻辑分点描述:

    • 如果sourceType为DETAIL_BASE_INFO,则导出详细信息。
    • 如果sourceType为INCLUSIVE_MP_INFO,则导出可选白名单套餐信息。
    • 如果sourceType为EXCLUSIVE_MP_INFO,则导出非使用黑名单套餐信息。
    • 如果sourceType为INDIVIDUATION_PRICE,则导出个性化价格信息,分为4个子表。

    /**

    • 导出信息到Excel

    • @param excelWriter Excel写入器,用于将数据写入Excel文件

    • @param queryDTO 查询DTO,包含需要导出的合同信息的查询条件

    • @param pageSize 每页记录数,用于分页导出数据

    • @param sourceType 数据源类型,决定导出哪种类型的合同信息
      */
      private void exportCommonInfo(ExcelWriter excelWriter, QeryDTO queryDTO, int pageSize, String sourceType) {
      // 根据数据源类型获取附件名称
      String attachName = CtmContractAttachType.getName(sourceType);
      // 创建一个写入工作表对象
      WriteSheet sheet = new WriteSheet();

      // 根据不同的数据源类型,创建相应的工作表并导出数据
      if (sourceType.equals(CtmContractAttachType.DETAIL_BASE_INFO.getCode())) {
      // 创建基础信息工作表
      sheet = EasyExcel.writerSheet(attachName).head(CtmContractBasicDetailExcelData.class).build();
      // 导出合同详细基础信息数据
      exportData(excelWriter, sheet, queryDTO, attachName,
      (pageNo) -> exportContractDetailInfo(pageNo, pageSize, queryDTO));
      }
      if (sourceType.equals(CtmContractAttachType.INCLUSIVE_MP_INFO.getCode())) {
      // 创建包容性信息工作表
      sheet = EasyExcel.writerSheet(attachName).head(CtmPbcInclusiveMpVO.class).build();
      // 导出合同包容性信息数据
      exportData(excelWriter, sheet, queryDTO, attachName,
      (pageNo) -> CommonPage.restPage(contractBasicMdMapper.listContractInclusiveMp(new Page<>(pageNo, pageSize), queryDTO)));
      }
      if (sourceType.equals(CtmContractAttachType.EXCLUSIVE_MP_INFO.getCode())) {
      // 创建排他性信息工作表
      sheet = EasyExcel.writerSheet(attachName).head(CtmPbcExclusiveMpVO.class).build();
      // 导出合同排他性信息数据
      exportData(excelWriter, sheet, queryDTO, attachName,
      (pageNo) -> CommonPage.restPage(contractBasicMdMapper.listContractExclusiveMp(new Page<>(pageNo, pageSize), queryDTO)));
      }
      if (sourceType.equals(CtmContractAttachType.INDIVIDUATION_PRICE.getCode())) {
      // 创建个性化价格信息工作表,可能有多个工作表
      for (int i = 1; i <= 4; i++) {
      final int finalI = i;
      sheet = createSheet(finalI);
      // 导出个性化价格信息数据到每个工作表
      exportData(excelWriter, sheet, queryDTO, sheet.getSheetName(), (pageNo) -> fetchData(finalI, pageNo, pageSize, queryDTO));
      }
      }
      }

通用的导出数据方法
/**
 * 通用的导出数据方法
 * 
 * 该方法负责将从数据库中分页获取的数据导出到Excel文件中它使用了同步块来避免多线程环境下的并发写入问题
 * 
 * @param excelWriter Excel写入器,用于将数据写入Excel文件
 * @param sheet 工作表配置,包含数据写入的具体配置信息
 * @param queryDTO 查询DTO,包含查询条件
 * @param sheetName 工作表名称
 * @param fetcher 分页获取数据的接口,用于从数据库中分页获取数据
 */
private <T> void exportData(ExcelWriter excelWriter, WriteSheet sheet, QeryDTO queryDTO, String sheetName, PagingFetcher<T> fetcher) {
    int pageNo = 1;
    try {
        while (true) {
            CommonPage<T> pageData = fetcher.fetch(pageNo);
            // 同步写入数据,避免多线程问题
            synchronized (excelWriter) {
                excelWriter.write(pageData.getList(), sheet);
            }
            if (CollectionUtil.isEmpty(pageData.getList())) {
                break;
            }
            log.info("线程名称:{},Sheet名称:{},记录数:{},页码:{}",
                     Thread.currentThread().getName(), sheetName, pageData.getList().size(), pageNo);
            pageNo++;
        }
    } catch (Exception e) {
        log.error("导出合同信息时发生异常,SourceType: {},Sheet名称: {}", sheetName, e.getMessage(), e);
        throw new BusinessException("导出合同信息时发生异常,SourceType: " + sheetName, e);
    }
}

// 使用函数式接口定义分页查询的获取方式
@FunctionalInterface
interface PagingFetcher<T> {
    CommonPage<T> fetch(int pageNo);
}
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐