切換語言為:簡體

透過雙重非同步,SpringBoot專案中 Excel 10萬行資料匯入從191秒最佳化到2秒!

  • 爱糖宝
  • 2024-11-11
  • 2030
  • 0
  • 0

在現代的企業級應用開發中,海量資料的處理效率和併發效能最佳化是一個非常重要的課題。無論是大規模資料匯入、檔案解析,還是在分散式系統中處理高併發任務,如何提升系統的處理速度、合理利用計算資源、減少執行緒上下文切換的開銷,這些都是開發者必須面對的問題。在這一背景下,執行緒池技術以及非同步程式設計逐漸成為提升系統性能的利器。

本文將深入探討如何透過合理設計執行緒池和利用非同步程式設計模型,有效最佳化大規模資料的處理效能。我們將結合 Spring Boot 框架中的 @Async 註解、自定義執行緒池、以及透過使用 EasyExcel 進行大資料量的 Excel 解析和非同步寫入資料庫的場景,詳細說明如何透過分而治之的策略,減少系統的響應時間、提高併發處理能力。同時,還將分析如何基於 CPU 和 IO 密集型任務的特性,來合理設定執行緒池的核心執行緒數、最大執行緒數等引數,以便在實際專案中能夠充分發揮硬體資源的效能。

通常我是這樣做的:

  1. 使用POI讀取需要匯入的Excel檔案;

  2. 將檔名作為表名,列標題作為列名,並將資料拼接成SQL語句;

  3. 透過JDBC或Mybatis插入到資料庫。

透過雙重非同步,SpringBoot專案中 Excel 10萬行資料匯入從191秒最佳化到2秒!

在操作中,如果檔案數量多且資料量大,處理過程可能會非常緩慢。

訪問後,感覺程式沒有響應,但實際上,它正在讀取並插入資料,只是速度很慢。

讀取包含10萬行的Excel檔案竟然耗時191秒!

我以為程式卡住了!

private void readXls(String filePath, String filename) throws Exception {
    @SuppressWarnings("resource")
    XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));
    // 讀取第一個工作表
    XSSFSheet sheet = xssfWorkbook.getSheetAt(0);
    // 獲取總行數
    int maxRow = sheet.getLastRowNum();

    StringBuilder insertBuilder = new StringBuilder();

    insertBuilder.append("insert into ").append(filename).append(" ( UUID,");

    XSSFRow row = sheet.getRow(0);
    for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) {
        insertBuilder.append(row.getCell(i)).append(",");
    }

    insertBuilder.deleteCharAt(insertBuilder.length() - 1);
    insertBuilder.append(" ) values ( ");

    StringBuilder stringBuilder = new StringBuilder();
    for (int i = 1; i <= maxRow; i++) {
        XSSFRow xssfRow = sheet.getRow(i);
        String id = "";
        String name = "";
        for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
            if (j == 0) {
                id = xssfRow.getCell(j) + "";
            } else if (j == 1) {
                name = xssfRow.getCell(j) + "";
            }
        }

        boolean flag = isExisted(id, name);
        if (!flag) {
            stringBuilder.append(insertBuilder);
            stringBuilder.append(''').append(uuid()).append(''').append(",");
            for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
                stringBuilder.append(''').append(value).append(''').append(",");
            }
            stringBuilder.deleteCharAt(stringBuilder.length() - 1);
            stringBuilder.append(" )").append("\n");
        }
    }

    List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());
    int sum = JdbcUtil.executeDML(collect);
}

private static boolean isExisted(String id, String name) {
    String sql = "select count(1) as num from " + static_TABLE + " where ID = '" + id + "' and NAME = '" + name + "'";
    String num = JdbcUtil.executeSelect(sql, "num");
    return Integer.valueOf(num) > 0;
}

private static String uuid() {
    return UUID.randomUUID().toString().replace("-", "");
}

如何最佳化?

最佳化1:首先,查詢所有資料,將其快取到map中,然後在插入前做決策。這樣可以大大提高速度。

最佳化2:如果單個Excel檔案太大,可以考慮使用非同步和多執行緒,分批讀取多行並插入資料庫。

透過雙重非同步,SpringBoot專案中 Excel 10萬行資料匯入從191秒最佳化到2秒!

最佳化3:如果檔案太多,可以為每個Excel檔案使用一個非同步程序,實現雙重非同步讀取和插入。

透過雙重非同步,SpringBoot專案中 Excel 10萬行資料匯入從191秒最佳化到2秒!

使用雙重非同步處理後,從191秒最佳化到了2秒,你能相信嗎?

以下是非同步讀取Excel檔案和批次讀取大Excel檔案的關鍵程式碼。

非同步讀取快取的Excel Controller類

@RequestMapping(value = "/readExcelCacheAsync", method = RequestMethod.POST)
@ResponseBody
public String readExcelCacheAsync() {
    String path = "G:\Test\data\";
    try {
        // 讀取Excel之前,快取所有資料
        USER_INFO_SET = getUserInfo();

        File file = new File(path);
        String[] xlsxArr = file.list();
        for (int i = 0; i < xlsxArr.length; i++) {
            File fileTemp = new File(path + "\" + xlsxArr[i]);
            String filename = fileTemp.getName().replace(".xlsx", "");
            readExcelCacheAsyncService.readXls(path + filename + ".xlsx", filename);
        }
    } catch (Exception e) {
        logger.error("|#ReadDBCsv|#Exception: ", e);
        return "error";
    }
    return "success";
}

批次讀取超大Excel檔案

@Async("async-executor")
public void readXls(String filePath, String filename) throws Exception {
    @SuppressWarnings("resource")
    XSSFWorkbook xssfWorkbook = new XSSFWorkbook(new FileInputStream(filePath));
    // 讀取第一個工作表
    XSSFSheet sheet = xssfWorkbook.getSheetAt(0);
    // 總行數
    int maxRow = sheet.getLastRowNum();
    logger.info(filename + ".xlsx,共 " + maxRow + " 行資料!");
    StringBuilder insertBuilder = new StringBuilder();

    insertBuilder.append("insert into ").append(filename).append(" ( UUID,");

    XSSFRow row = sheet.getRow(0);
    for (int i = 0; i < row.getPhysicalNumberOfCells(); i++) {
        insertBuilder.append(row.getCell(i)).append(",");
    }

    insertBuilder.deleteCharAt(insertBuilder.length() - 1);
    insertBuilder.append(" ) values ( ");

    int times = maxRow / STEP + 1;
    for (int time = 0; time < times; time++) {
        int start = STEP * time + 1;
        int end = STEP * time + STEP;

        if (time == times - 1) {
            end = maxRow;
        }

        if (end + 1 - start > 0) {
            readExcelDataAsyncService.readXlsCacheAsyncMybatis(sheet, row, start, end, insertBuilder);
        }
    }
}

非同步批次插入資料庫

@Async("async-executor")
public void readXlsCacheAsync(XSSFSheet sheet, XSSFRow row, int start, int end, StringBuilder insertBuilder) {
    StringBuilder stringBuilder = new StringBuilder();
    for (int i = start; i <= end; i++) {
        XSSFRow xssfRow = sheet.getRow(i);
        String id = "";
        String name = "";
        for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
            if (j == 0) {
                id = xssfRow.getCell(j) + "";
            } else if (j == 1) {
                name = xssfRow.getCell(j) + "";
            }
        }

        // 在讀取Excel之前,先快取所有資料,然後做決策
        boolean flag = isExisted(id, name);
        if (!flag) {
            stringBuilder.append(insertBuilder);
            stringBuilder.append(''').append(uuid()).append(''').append(",");
            for (int j = 0; j < row.getPhysicalNumberOfCells(); j++) {
                stringBuilder.append(''').append(value).append(''').append(",");
            }
            stringBuilder.deleteCharAt(stringBuilder.length() - 1);
            stringBuilder.append(" )").append("\n");
        }
    }

    List<String> collect = Arrays.stream(stringBuilder.toString().split("\n")).collect(Collectors.toList());
    if (collect != null && collect.size() > 0) {
        int sum = JdbcUtil.executeDML(collect);
    }
}

private boolean isExisted(String id, String name) {
    return ReadExcelCacheAsyncController.USER_INFO_SET.contains(id + "," + name);
}

非同步執行緒池工具類

@Async 的目的是非同步處理任務。

  1. 在方法上新增 @Async 表明該方法是非同步的。

  2. 在類上新增 @Async 表示該類中的所有方法都是非同步的。

  3. 使用此註解的類必須由 Spring 管理。

  4. 必須在啟動類或配置類中新增 @EnableAsync 註解,@Async 才能生效。

在使用 @Async 時,如果不指定執行緒池的名稱,即不自定義執行緒池,預設會使用一個執行緒池。這個預設執行緒池是 Spring 的 SimpleAsyncTaskExecutor。

預設執行緒池的預設配置如下:

  1. 預設核心執行緒數:8。

  2. 最大執行緒數:Integer.MAX_VALUE。

  3. 佇列型別:LinkedBlockingQueue。

  4. 容量:Integer.MAX_VALUE。

  5. 空閒執行緒保留時間:60秒。

  6. 執行緒池拒絕策略:AbortPolicy。

從最大執行緒數可以看出,在併發情況下,執行緒會無限制地建立。

你也可以透過 yml 檔案重新配置:

spring:
  task:
    execution:
      pool:
        max-size: 10
        core-size: 5
        keep-alive: 3s
        queue-capacity: 1000
        thread-name-prefix: my-executor

你也可以自定義執行緒池。以下是使用 @Async 自定義執行緒池的簡單程式碼實現:

@EnableAsync // 支援非同步操作
@Configuration
public class AsyncTaskConfig {

    /**
     * 來自 com.google.guava 的執行緒池
     * @return
     */
    @Bean("my-executor")
    public Executor firstExecutor() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-executor").build();
        // 獲取 CPU 處理器數量
        int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(curSystemThreads, 100,
                200, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(), threadFactory);
        threadPool.allowsCoreThreadTimeOut();
        return threadPool;
    }

    /**
     * Spring 的執行緒池
     * @return
     */
    @Bean("async-executor")
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // 核心執行緒數
        taskExecutor.setCorePoolSize(24);
        // 執行緒池維護的最大執行緒數,超出核心執行緒數的執行緒僅當緩衝佇列滿時纔會建立
        taskExecutor.setMaxPoolSize(200);
        // 緩衝佇列
        taskExecutor.setQueueCapacity(50);
        // 超出核心執行緒數的執行緒空閒時間,超時後將被銷燬
        taskExecutor.setKeepAliveSeconds(200);
        // 非同步方法內部執行緒名
        taskExecutor.setThreadNamePrefix("async-executor-");

        /**
         * 當執行緒池的任務快取佇列已滿,且執行緒池中的執行緒數量已達到最大值時,如果還有任務到來,將採用任務拒絕策略。
         * 通常有以下四種策略:
         * ThreadPoolExecutor.AbortPolicy:拋棄任務並丟擲 RejectedExecutionException 異常。
         * ThreadPoolExecutor.DiscardPolicy:拋棄任務,但不丟擲異常。
         * ThreadPoolExecutor.DiscardOldestPolicy:拋棄佇列最前面的任務,然後嘗試執行當前任務(重複此過程)。
         * ThreadPoolExecutor.CallerRunsPolicy:重試新增當前任務,自動呼叫執行方法,直到成功。
         */
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }
}

非同步失效的原因

  1. 被 @Async 註解的方法不是 public 的;

  2. 被 @Async 註解的方法的返回值型別只能是 void 或 Future;

  3. 被 @Async 註解的方法如果是靜態的也會失效;

  4. 未新增 @EnableAsync 註解;

  5. 呼叫者和被 @Async 註解的方法不能在同一個類中;

  6. 對非同步方法使用 @Transactional 是無效的,但對非同步方法內呼叫的方法加上 @Transactional 是有效的。

執行緒池中設定核心執行緒數的問題

我尚未有時間詳細探討:線上程池中設定 CorePoolSize 和 MaxPoolSize 的最適宜和最高效的數量是多少。

藉此機會進行了一些測試。

我記得有個關於 CPU 處理器數量的說法

將 CorePoolSize 設定為 CPU 處理器的數量時,效率最高嗎?

// 獲取 CPU 處理器數量
int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;

Runtime.getRuntime().availableProcessors() 會獲取 CPU 核心執行緒數,代表計算資源。

  • 對於 CPU 密集型任務,執行緒池的大小設定為 N,與 CPU 執行緒數一致,這可以最大限度地減少執行緒間的上下文切換。但在實際開發中,一般設定為 N+1,以防止執行緒由於不可預見的情況而阻塞。如果發生阻塞,多出來的執行緒可以繼續執行任務,保證 CPU 的高效利用。

  • 對於 IO 密集型任務,執行緒池的大小設定為 2N。這個數值是根據業務壓力測試得出的,或者在不涉及業務時使用推薦值。

實際中,執行緒池的具體大小需要根據壓力測試以及機器的當前狀態進行調整。

如果執行緒池過大,會導致 CPU 持續切換,系統整體效能並不會有顯著提高,反而可能會變慢。

我電腦的 CPU 處理器數量為 24。

那麼一次讀取多少行效率最高呢?

測試中,Excel 檔案包含 10 萬行資料。10 萬 / 24 = 4166,因此我設定為 4200。這是最有效的設定嗎?

測試過程中似乎的確如此。

我記得大家習慣性地將核心執行緒數(CorePoolSize)和最大執行緒數(MaxPoolSize)設定為相同的數值,通常是 200。

這只是隨機選擇,還是基於經驗的?

測試發現,當 CorePoolSize 和 MaxPoolSize 都設定為 200 時,最初同時開啟了 150 個執行緒工作。

為什麼會這樣呢?

經過數十次測試後

  1. 發現核心執行緒數並沒有太大區別;

  2. 關鍵是每次讀取和儲存的行數,不能太多,儲存速度會逐漸減慢;

  3. 也不能太少,如果少於 150 個執行緒,會導致執行緒阻塞,反而減慢程序。

IV.使用 EasyExcel 讀取並插入資料庫

我不會寫 EasyExcel 的雙非同步最佳化。大家要記住避免掉進低階勤奮的陷阱。

ReadEasyExcelController

@RequestMapping(value = "/readEasyExcel", method = RequestMethod.POST)
@ResponseBody
public String readEasyExcel() {
    try {
        String path = "G:\Test\data\";
        String[] xlsxArr = new File(path).list();
        for (int i = 0; i < xlsxArr.length; i++) {
            String filePath = path + xlsxArr[i];
            File fileTemp = new File(path + xlsxArr[i]);
            String fileName = fileTemp.getName().replace(".xlsx", "");
            List<UserInfo> list = new ArrayList<>();
            EasyExcel.read(filePath, UserInfo.class, new ReadEasyExeclAsyncListener(readEasyExeclService, fileName, batchCount, list)).sheet().doRead();
        }
    }catch (Exception e){
        logger.error("readEasyExcel Exception:",e);
        return "error";
    }
    return "success";
}

ReadEasyExeclAsyncListener

public ReadEasyExeclService readEasyExeclService;
// 表名
public String TABLE_NAME;
// 批次插入閾值
private int BATCH_COUNT;
// 資料收集
private List<UserInfo> LIST;

public ReadEasyExeclAsyncListener(ReadEasyExeclService readEasyExeclService, String tableName, int batchCount, List<UserInfo> list) {
    this.readEasyExeclService = readEasyExeclService;
    this.TABLE_NAME = tableName;
    this.BATCH_COUNT = batchCount;
    this.LIST = list;
}

@Override
public void invoke(UserInfo data, AnalysisContext analysisContext) {
    data.setUuid(uuid());
    data.setTableName(TABLE_NAME);
    LIST.add(data);
    if (LIST.size() >= BATCH_COUNT) {
        // 批次入庫
        readEasyExeclService.saveDataBatch(LIST);
    }
}

@Override
public void doAfterAllAnalysed(AnalysisContext analysisContext) {
    if (LIST.size() > 0) {
        // 最後一批入庫
        readEasyExeclService.saveDataBatch(LIST);
    }
}

public static String uuid() {
    return UUID.randomUUID().toString().replace("-", "");
}

ReadEasyExeclServiceImpl

@Service
public class ReadEasyExeclServiceImpl implements ReadEasyExeclService {

    @Resource
    private ReadEasyExeclMapper readEasyExeclMapper;

    @Override
    public void saveDataBatch(List<UserInfo> list) {
        // Insert into the database via mybatis
        readEasyExeclMapper.saveDataBatch(list);
      // Insert into the database via JDBC
        // insertByJdbc(list);
        list.clear();
    }
    
    private void insertByJdbc(List<UserInfo> list){
        List<String> sqlList = new ArrayList<>();
        for (UserInfo u : list){
            StringBuilder sqlBuilder = new StringBuilder();
            sqlBuilder.append("insert into ").append(u.getTableName()).append(" ( UUID,ID,NAME,AGE,ADDRESS,PHONE,OP_TIME ) values ( ");
            sqlBuilder.append("'").append(ReadEasyExeclAsyncListener.uuid()).append("',")
                            .append("'").append(u.getId()).append("',")
                            .append("'").append(u.getName()).append("',")
                            .append("'").append(u.getAge()).append("',")
                            .append("'").append(u.getAddress()).append("',")
                            .append("'").append(u.getPhone()).append("',")
                            .append("sysdate )");
            sqlList.add(sqlBuilder.toString());
        }

        JdbcUtil.executeDML(sqlList);
    }
}

UserInfo

@Data
public class UserInfo {

    private String tableName;

    private String uuid;

    @ExcelProperty(value = "ID")
    private String id;

    @ExcelProperty(value = "NAME")
    private String name;

    @ExcelProperty(value = "AGE")
    private String age;

    @ExcelProperty(value = "ADDRESS")
    private String address;

    @ExcelProperty(value = "PHONE")
    private String phone;
}

結語

在處理高併發、大資料匯入等場景時,非同步程式設計和執行緒池技術提供了一種極具效率的解決方案。透過合理配置執行緒池的核心執行緒數、最大執行緒數、佇列長度等引數,能夠在確保系統穩定性的前提下,大幅提升併發處理能力。而透過非同步程式設計,我們可以有效避免執行緒阻塞、減少資源浪費,並讓系統在面對大量請求時依然能夠保持較高的響應速度。

本文的示例透過 Spring Boot 的 @Async 註解和自定義執行緒池,在實際的 EasyExcel 大資料匯入場景下,驗證了這種技術組合的高效性和實用性。此外,透過對 CPU 密集型任務和 IO 密集型任務的深入分析,開發者能夠根據自身專案的特點,選擇合適的執行緒池配置策略,最大化資源利用率和效能表現。

在實際應用中,執行緒池和非同步程式設計不僅適用於大資料匯入,還可以推廣到包括檔案處理、網路請求、日誌處理等各類需要併發處理的場景中。因此,掌握並靈活運用這些技術,將為我們的系統性能最佳化提供堅實的基礎,使我們能夠應對更復雜、更苛刻的業務需求。


作者:皮皮林551
連結:https://juejin.cn/post/7435607457726627840

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.