新聞中心
上面的問題其實是前段時間接到一個真實的業(yè)務需求,將一個老系統(tǒng)歷史數據通過線下文件的方式遷移到新的生產系統(tǒng)。
創(chuàng)新互聯公司長期為上千余家客戶提供的網站建設服務,團隊從業(yè)經驗10年,關注不同地域、不同群體,并針對不同對象提供差異化的產品和服務;打造開放共贏平臺,與合作伙伴共同營造健康的互聯網生態(tài)環(huán)境。為白銀區(qū)企業(yè)提供專業(yè)的成都網站建設、網站制作,白銀區(qū)網站改版等技術服務。擁有十載豐富建站經驗和眾多成功案例,為您定制開發(fā)。
由于老板們已經敲定了新系統(tǒng)上線時間,所以只留給我一周的時間將歷史數據導入生產系統(tǒng)。
由于時間緊,而數據量又超大,所以在設計的過程想到一下解決辦法:
- 拆分文件
- 多線程導入
拆分文件
首先我們可以寫個小程序,或者使用拆分命令 ??split?? 將這個超大文件拆分一個個小文件。
-- 將一個大文件拆分成若干個小文件,每個文件 100000 行
split -l 100000 largeFile.txt -d -a 4 smallFile_
這里之所以選擇先將大文件拆分,主要考慮到兩個原因:
1、如果程序直接讀取這個大文件,假設讀取一半的時候,程序突然宕機,這樣就會直接丟失文件讀取的進度,又需要重新開頭讀取。
而文件拆分之后,一旦小文件讀取結束,我們可以將小文件移動一個指定文件夾。
這樣即使應用程序宕機重啟,我們重新讀取時,只需要讀取剩余的文件。
2、一個文件,只能被一個應用程序讀取,這樣就限制了導入的速度。
而文件拆分之后,我們可以采用多節(jié)點部署的方式,水平擴展。每個節(jié)點讀取一部分文件,這樣就可以成倍的加快導入速度。
多線程導入
當我們拆分完文件,接著我們就需要讀取文件內容,進行導入。
之前拆分的時候,設置每個小文件包含 10w 行的數據。由于擔心一下子將 10w 數據讀取應用中,導致堆內存占用過高,引起頻繁的 「Full GC」,所以下面采用流式讀取的方式,一行一行的讀取數據。
當然了,如果拆分之后文件很小,或者說應用的堆內存設置很大,我們可以直接將文件加載到應用內存中處理。這樣相對來說簡單一點。
逐行讀取的代碼如下:
File file = ...
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
while (iterator.hasNext()) {
String line=iterator.nextLine();
convertToDB(line);
}
}
上面代碼使用 ??commons-io?? 中的 ??LineIterator??類,這個類底層使用了 ??BufferedReader?? 讀取文件內容。它將其封裝成迭代器模式,這樣我們可以很方便的迭代讀取。
如果當前使用 JDK1.8 ,那么上述操作更加簡單,我們可以直接使用 JDK 原生的類 ??Files??將文件轉成 ??Stream?? 方式讀取,代碼如下:
Files.lines(Paths.get("文件路徑"), Charset.defaultCharset()).forEach(line -> {
convertToDB(line);
});
其實仔細看下 ??Files#lines??底層源碼,其實原理跟上面的 ??LineIterator??類似,同樣也是封裝成迭代器模式。
多線程的引入存在的問題
上述讀取的代碼寫起來不難,但是存在效率問題,主要是因為只有單線程在導入,上一行數據導入完成之后,才能繼續(xù)操作下一行。
為了加快導入速度,那我們就多來幾個線程,并發(fā)導入。
多線程我們自然將會使用線程池的方式,相關代碼改造如下:
File file = ...;
ExecutorService executorService = new ThreadPoolExecutor(
5,
10,
60,
TimeUnit.MINUTES,
// 文件數量,假設文件包含 10W 行
new ArrayBlockingQueue<>(10*10000),
// guava 提供
new ThreadFactoryBuilder().setNameFormat("test-%d").build());
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
while (iterator.hasNext()) {
String line = iterator.nextLine();
executorService.submit(() -> {
convertToDB(line);
});
}
}
上述代碼中,每讀取到一行內容,就會直接交給線程池來執(zhí)行。
我們知道線程池原理如下:
- 如果核心線程數未滿,將會直接創(chuàng)建線程執(zhí)行任務。
- 如果核心線程數已滿,將會把任務放入到隊列中。
- 如果隊列已滿,將會再創(chuàng)建線程執(zhí)行任務。
- 如果最大線程數已滿,隊列也已滿,那么將會執(zhí)行拒絕策略。
線程池執(zhí)行流程圖
由于我們上述線程池設置的核心線程數為 5,很快就到達了最大核心線程數,后續(xù)任務只能被加入隊列。
為了后續(xù)任務不被線程池拒絕,我們可以采用如下方案:
- 將隊列容量設置成很大,包含整個文件所有行數
- 將最大線程數設置成很大,數量大于整個文件所有行數
以上兩種方案都存在同樣的問題,第一種是相當于將文件所有內容加載到內存,將會占用過多內存。
而第二種創(chuàng)建過多的線程,同樣也會占用過多內存。
一旦內存占用過多,GC 無法清理,就可能會引起頻繁的 「Full GC」,甚至導致 「OOM」,導致程序導入速度過慢。
當然了,我們還可以第三種方案,綜合前兩種,設置合適隊列長度,以及合適最大線程數。不過呢,「合適」這個度真不好把握,另外也還是有 **「OOM」 **問題。
所以為了解決這個問題,日思夜想研究出兩個解決方案:
CountDownLatch批量執(zhí)行- 擴展線程池
CountDownLatch 批量執(zhí)行
JDK 提供的 ??CountDownLatch??,可以讓主線程等待子線程都執(zhí)行完成之后,再繼續(xù)往下執(zhí)行。
利用這個特性,我們可以改造多線程導入的代碼,主體邏輯如下:
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
// 存儲每個任務執(zhí)行的行數
List lines = Lists.newArrayList();
// 存儲異步任務
List tasks = Lists.newArrayList();
while (iterator.hasNext()) {
String line = iterator.nextLine();
lines.add(line);
// 設置每個線程執(zhí)行的行數
if (lines.size() == 1000) {
// 新建異步任務,注意這里需要創(chuàng)建一個 List
tasks.add(new ConvertTask(Lists.newArrayList(lines)));
lines.clear();
}
if (tasks.size() == 10) {
asyncBatchExecuteTask(tasks);
}
}
// 文件讀取結束,但是可能還存在未被內容
tasks.add(new ConvertTask(Lists.newArrayList(lines)));
// 最后再執(zhí)行一次
asyncBatchExecuteTask(tasks);
}
這段代碼中,每個異步任務將會導入 1000 行數據,等積累了 10 個異步任務,然后將會調用 ??asyncBatchExecuteTask?? 使用線程池異步執(zhí)行。
/**
* 批量執(zhí)行任務
*
* @param tasks
*/
private static void asyncBatchExecuteTask(Listtasks) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
for (ConvertTask task : tasks) {
task.setCountDownLatch(countDownLatch);
executorService.submit(task);
}
// 主線程等待異步線程 countDownLatch 執(zhí)行結束
countDownLatch.await();
// 清空,重新添加任務
tasks.clear();
}
??asyncBatchExecuteTask?? 方法內將會創(chuàng)建 ??CountDownLatch??,然后主線程內調用 ??await??方法等待所有異步線程執(zhí)行結束。
??ConvertTask?? 異步任務邏輯如下:
/**
* 異步任務
* 等數據導入完成之后,一定要調用 countDownLatch.countDown()
* 不然,這個主線程將會被阻塞,
*/
private static class ConvertTask implements Runnable {
private CountDownLatch countDownLatch;
private Listlines;
public ConvertTask(Listlines) {
this.lines = lines;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
for (String line : lines) {
convertToDB(line);
}
} finally {
countDownLatch.countDown();
}
}
}
??ConvertTask??任務類邏輯就非常簡單,遍歷所有行,將其導入到數據庫中。所有數據導入結束,調用 ??countDownLatch#countDown??。
一旦所有異步線程執(zhí)行結束,調用 ??countDownLatch#countDown??,主線程將會被喚醒,繼續(xù)執(zhí)行文件讀取。
雖然這種方式解決上述問題,但是這種方式,每次都需要積累一定任務數才能開始異步執(zhí)行所有任務。
另外每次都需要等待所有任務執(zhí)行結束之后,才能開始下一批任務,批量執(zhí)行消耗的時間等于最慢的異步任務消耗的時間。
這種方式線程池中線程存在一定的閑置時間,那有沒有辦法一直壓榨線程池,讓它一直在干活呢?
擴展線程池
回到最開始的問題,文件讀取導入,其實就是一個「生產者-消費者」消費模型。
主線程作為生產者不斷讀取文件,然后將其放置到隊列中。
異步線程作為消費者不斷從隊列中讀取內容,導入到數據庫中。
「一旦隊列滿載,生產者應該阻塞,直到消費者消費任務?!?/strong>
其實我們使用線程池的也是一個「生產者-消費者」消費模型,其也使用阻塞隊列。
那為什么線程池在隊列滿載的時候,不發(fā)生阻塞?
這是因為線程池內部使用 ??offer?? 方法,這個方法在隊列滿載的時候「不會發(fā)生阻塞」,而是直接返回 。
那我們有沒有辦法在線程池隊列滿載的時候,阻塞主線程添加任務?
其實是可以的,我們自定義線程池拒絕策略,當隊列滿時改為調用 ??BlockingQueue.put?? 來實現生產者的阻塞。
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
// should not be interrupted
}
}
}
};
這樣一旦線程池滿載,主線程將會被阻塞。
使用這種方式之后,我們可以直接使用上面提到的多線程導入的代碼。
ExecutorService executorService = new ThreadPoolExecutor(
5,
10,
60,
TimeUnit.MINUTES,
new ArrayBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("test-%d").build(),
(r, executor) -> {
if (!executor.isShutdown()) {
try {
// 主線程將會被阻塞
executor.getQueue().put(r);
} catch (InterruptedException e) {
// should not be interrupted
}
}
});
File file = new File("文件路徑");
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
while (iterator.hasNext()) {
String line = iterator.nextLine();
executorService.submit(() -> convertToDB(line));
}
}
小結
一個超大的文件,我們可以采用拆分文件的方式,將其拆分成多份文件,然后部署多個應用程序提高讀取速度。
另外讀取過程我們還可以使用多線程的方式并發(fā)導入,不過我們需要注意線程池滿載之后,將會拒絕后續(xù)任務。
我們可以通過擴展線程池,自定義拒絕策略,使讀取主線程阻塞。
好了,今天文章內容就到這里,不知道各位有沒有其他更好的解決辦法,
名稱欄目:30G超大數據文件,如何用一周時間導入生產數據庫?
瀏覽路徑:http://m.fisionsoft.com.cn/article/cojgoss.html


咨詢
建站咨詢

