摘要:最開始是使用的正常的普通方式去寫入,但是量太大了,所以就嘗試使用多線程來寫入。下面我們就來介紹一下怎么使用多線程進行導(dǎo)入。配置線程池我們需要創(chuàng)建一個類來設(shè)置線程池的各種配置。它可以使主線程一直等到所有的子線程執(zhí)行完之后再執(zhí)行。
前言:
最近在工作中需要將一大批數(shù)據(jù)導(dǎo)入到數(shù)據(jù)庫中,因為種種原因這些數(shù)據(jù)不能使用同步數(shù)據(jù)的方式來進行復(fù)制,而是提供了一批文本,文本里面有很多行url地址,需要的字段都包含在這些url中。最開始是使用的正常的普通方式去寫入,但是量太大了,所以就嘗試使用多線程來寫入。下面我們就來介紹一下怎么使用多線程進行導(dǎo)入。
1.文本格式格式就是類似于這種格式的url,當(dāng)然這里只是舉個例子,大概有300多個文本,每個文本里面有大概25000條url,而每條url要插入兩個表,這個量還是有點大的,單線程跑的非常慢。
https://www.test.com/?type=1&code=123456&goodsId=321
2.springboot配置線程池我們需要創(chuàng)建一個ExecutorConfig類來設(shè)置線程池的各種配置。
@Configuration @EnableAsync public class ExecutorConfig { private static Logger logger = LogManager.getLogger(ExecutorConfig.class.getName()); @Bean public Executor asyncServiceExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心線程數(shù) executor.setCorePoolSize(5); //配置最大線程數(shù) executor.setMaxPoolSize(10); //配置隊列大小 executor.setQueueCapacity(400); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("thread-"); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時候,如何處理新任務(wù) // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執(zhí)行初始化 executor.initialize(); return executor; } }3.創(chuàng)建異步任務(wù)接口
我們需要創(chuàng)建一個接口,再這個接口里面聲明了我們需要調(diào)用的異步方法
public interface AsyncService { /** * 執(zhí)行異步任務(wù) */ void writeTxt(); }4.創(chuàng)建異步實現(xiàn)類
再創(chuàng)建一個異步類實現(xiàn)上面的異步接口,重寫接口里面的方法,最重要的是我們需要在方法上加@Async("asyncServiceExecutor")注解,它是剛剛我們在線程池配置類的里的那個配制方法的名字,加上這個后每次執(zhí)行這個方法都會開啟一個線程放入線程池中。我下面這個方法是開啟多線程遍歷文件夾中的文件然后為每個文件都復(fù)制一個副本出來。
@Service public class AsyncServiceImpl implements AsyncService { private static Logger logger = LogManager.getLogger(AsyncServiceImpl.class.getName()); @Async("asyncServiceExecutor") public void writeTxt(String fileName){ logger.info("線程-" + Thread.currentThread().getId() + "在執(zhí)行寫入"); try { File file = new File(fileName); Listlines = FileUtils.readLines(file); File copyFile = new File(fileName + "_copy.txt"); lines.stream().forEach(string->{ try { FileUtils.writeStringToFile(copyFile,string,"utf8",true); FileUtils.writeStringToFile(copyFile," ","utf8",true); } catch (IOException e) { logger.info(e.getMessage()); } }); }catch (Exception e) { logger.info(e.getMessage()); } } }
@RunWith(SpringRunner.class) @SpringBootTest public class BootApplicationTests { @Autowired private AsyncService asyncService; @Test public void write() { File file = new File("F://ac_code_1//test.txt"); try { FileUtils.writeStringToFile(file, "ceshi", "utf8"); FileUtils.writeStringToFile(file, " ", "utf8"); FileUtils.writeStringToFile(file, "ceshi2", "utf8"); } catch (IOException e) { e.printStackTrace(); } }5.修改為阻塞式
上面的步驟已經(jīng)基本實現(xiàn)了多線程的操作,但是當(dāng)我真的開始導(dǎo)入數(shù)據(jù)的時候又發(fā)現(xiàn)一個問題,就是每次運行后才剛開始導(dǎo)入就自動停止了,原因是我在Junit中運行了代碼后它雖然開始導(dǎo)入了,但是因為數(shù)據(jù)很多時間很長,而Juint跑完主線程的邏輯后就把整個JVM都關(guān)掉了,所以導(dǎo)入了一點點就停止了,上面的測試方法之所以沒問題是因為幾個文件的復(fù)制速度很快,在主線程跑完之前就跑完了,所以看上去沒問題。最開始我用了一個最笨的方法,直接在主線程最后調(diào)用Thread.sleep()方法,雖然有效果但是這也太low了,而且你也沒法判斷到底數(shù)據(jù)導(dǎo)完沒有。所以我又換了一個方式。
6.使用countDownLatch阻塞主線程CountDownLatch是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程執(zhí)行完后再執(zhí)行。它可以使主線程一直等到所有的子線程執(zhí)行完之后再執(zhí)行。我們修改下代碼,創(chuàng)建一個CountDownLatch實例,大小是所有運行線程的數(shù)量,然后在異步類的方法中的finally里面對它進行減1,在主線程最后調(diào)用await()方法,這樣就能確保所有的子線程運行完后主線程才會繼續(xù)執(zhí)行。
@RunWith(SpringRunner.class) @SpringBootTest public class BootApplicationTests { private final CountDownLatch countDownLatch = new CountDownLatch(10); @Autowired private AsyncService asyncService; @Test public void mainWait() { try { for (int i = 0; i < 10; i++) { asyncService.mainWait(countDownLatch); } countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
@Service public class AsyncServiceImpl implements AsyncService { private static Logger logger = LogManager.getLogger(AsyncServiceImpl.class.getName()); @Override @Async("asyncServiceExecutor") public void mainWait(CountDownLatch countDownLatch) { try { System.out.println("線程" + Thread.currentThread().getId() + "開始執(zhí)行"); for (int i=1;i<1000000000;i++){ Integer integer = new Integer(i); int l = integer.intValue(); for (int x=1;x<10;x++){ Integer integerx = new Integer(x); int j = integerx.intValue(); } } System.out.println("線程" + Thread.currentThread().getId() + "執(zhí)行結(jié)束"); } catch (Exception e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } } }7.導(dǎo)入代碼
雖然上面的多線程是重點,不過還是把導(dǎo)入數(shù)據(jù)的代碼展示出來給大家參考一下,當(dāng)然這是簡化版,真實的要比這個多了很多判斷,不過那都是基于業(yè)務(wù)需求做的判斷。
@RunWith(value = SpringRunner.class) @SpringBootTest public class ApplicationTests { private static Log logger = LogFactory.getLog(ApplicationTests.class); private final CountDownLatch countDownLatch; @Autowired AsyncService asyncService; @Test public void writeCode() { try { File file = new File("F:ac_code_1"); File[] files = file.listFiles(); //計數(shù)器數(shù)量就等于文件數(shù)量,因為每個文件會開一個線程 countDownLatch = new CountDownLatch(files.length); Arrays.stream(files).forEach(file1 -> { File child = new File(file1.getAbsolutePath()); String fileName = child.getAbsolutePath(); logger.info(asyncService.writeCode(fileName,countDownLatch)); }); countDownLatch.await(); catch (InterruptedException e) { e.printStackTrace(); } } }
@Service public class AsyncServiceImpl implements AsyncService { private static Log logger = LogFactory.getLog(AsyncServiceImpl.class); @Autowired IExampleService exampleService; @Override @Async("asyncServiceExecutor") public String writeCode(String fileName,CountDownLatch countDownLatch) { logger.info("線程-" + Thread.currentThread().getId() + "在導(dǎo)入-" + fileName); try { File file = new File(fileName); List總結(jié):list = FileUtils.readLines(file); for (String string : list) { String[] parmas = string.split(","); ExampleVo vo = new ExampleVo(); vo.setParam1(parmas[0]); vo.setParam1(parmas[1]); vo.setParam1(parmas[2]); exampleService.save(vo); } return "導(dǎo)入完成-" + fileName; }catch (Exception e){ e.printStackTrace(); return null; }finally { //導(dǎo)入完后減1 countDownLatch.countDown(); } } }
到這里就已經(jīng)講完了多線程插入數(shù)據(jù)的方法,目前這個方法還很簡陋。因為是每個文件都開一個線程性能消耗比較大,而且如果線程線程池的線程配置太多了,頻繁切換反而會變得很慢,大家如果有更好的辦法都可以留言討論。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/77066.html
摘要:對多線程的支持詳解這兩天看阿里的開發(fā)手冊,到多線程的時候說永遠(yuǎn)不要用這種方式來使用多線程。在使用線程池的大多數(shù)情況下都是異步非阻塞的。二配置類配置類代碼如下下午解讀利用來開啟對于異步任務(wù)的支持配置類實現(xiàn)接口,返回一個線程池對象。 Springboot對多線程的支持詳解 這兩天看阿里的JAVA開發(fā)手冊,到多線程的時候說永遠(yuǎn)不要用 new Thread()這種方式來使用多線程。確實是這樣的...
摘要:也是自帶的一個基于線程池設(shè)計的定時任務(wù)類。其每個調(diào)度任務(wù)都會分配到線程池中的一個線程執(zhí)行,所以其任務(wù)是并發(fā)執(zhí)行的,互不影響。 原創(chuàng)不易,如需轉(zhuǎn)載,請注明出處https://www.cnblogs.com/baixianlong/p/10659045.html,否則將追究法律責(zé)任?。?! 一、在JAVA開發(fā)領(lǐng)域,目前可以通過以下幾種方式進行定時任務(wù) 1、單機部署模式 Timer:jdk中...
摘要:馬蜂窩推薦系統(tǒng)對于請求的平均處理時延要求在級別,時延的線保持在以內(nèi)。任務(wù)隊列與異步寫入這里我們使用了中的線程池來實現(xiàn)。三優(yōu)化方向基于和,我們在現(xiàn)有的推薦系統(tǒng)中增加了一個本地容災(zāi)緩存系統(tǒng),當(dāng)依賴服務(wù)或者應(yīng)用本身突發(fā)異常時可以返回緩存的數(shù)據(jù)。 數(shù)據(jù)庫突然斷開連接、第三方接口遲遲不返回結(jié)果、高峰期網(wǎng)絡(luò)發(fā)生抖動...... 當(dāng)程序突發(fā)異常時,我們的應(yīng)用可以告訴調(diào)用方或者用戶「對不起,服務(wù)器出了...
摘要:馬蜂窩推薦系統(tǒng)對于請求的平均處理時延要求在級別,時延的線保持在以內(nèi)。任務(wù)隊列與異步寫入這里我們使用了中的線程池來實現(xiàn)。三優(yōu)化方向基于和,我們在現(xiàn)有的推薦系統(tǒng)中增加了一個本地容災(zāi)緩存系統(tǒng),當(dāng)依賴服務(wù)或者應(yīng)用本身突發(fā)異常時可以返回緩存的數(shù)據(jù)。數(shù)據(jù)庫突然斷開連接、第三方接口遲遲不返回結(jié)果、高峰期網(wǎng)絡(luò)發(fā)生抖動...... 當(dāng)程序突發(fā)異常時,我們的應(yīng)用可以告訴調(diào)用方或者用戶「對不起,服務(wù)器出了點問題」...
閱讀 3275·2023-04-25 22:47
閱讀 3789·2021-10-11 10:59
閱讀 2317·2021-09-07 10:12
閱讀 4273·2021-08-11 11:15
閱讀 3442·2019-08-30 13:15
閱讀 1759·2019-08-30 13:00
閱讀 979·2019-08-29 14:02
閱讀 1698·2019-08-26 13:57