摘要:所以得出結(jié)論需要分配較多的線程進(jìn)行讀數(shù)據(jù),較少的線程進(jìn)行寫數(shù)據(jù)。注意多線程編程對實(shí)際環(huán)境和需求有很大的依賴,需要根據(jù)實(shí)際的需求情況對各個(gè)參數(shù)做調(diào)整。
背景
最近對于 Java 多線程做了一段時(shí)間的學(xué)習(xí),筆者一直認(rèn)為,學(xué)習(xí)東西就是要應(yīng)用到實(shí)際的業(yè)務(wù)需求中的。否則要么無法深入理解,要么硬生生地套用技術(shù)只是達(dá)到炫技的效果。
不過筆者仍舊認(rèn)為自己對于多線程掌握不夠熟練,不敢輕易應(yīng)用到生產(chǎn)代碼中。這就按照平時(shí)工作中遇到的實(shí)際問題,腦補(bǔ)了一個(gè)很可能存在的業(yè)務(wù)場景:
已知某公司管理著 1000 個(gè)微信服務(wù)號,每個(gè)服務(wù)號有 1w ~ 50w 粉絲不等。假設(shè)該公司每天都需要將所有微信服務(wù)號的粉絲數(shù)據(jù)通過調(diào)用微信 API 的方式更新到本地?cái)?shù)據(jù)庫。
需求分析對此需求進(jìn)行分析,主要存在以下問題:
單個(gè)服務(wù)號獲取粉絲 id,只能每次 1w 按順序拉取
微信的 API 對于服務(wù)商的并發(fā)請求數(shù)量有限制
單個(gè)服務(wù)號獲取粉絲 id,只能每次 1w 按順序拉取。這個(gè)問題決定了單個(gè)公眾號在拉取粉絲 id 上,無法分配給多個(gè)線程執(zhí)行。
微信的 API 對于服務(wù)商的并發(fā)請求數(shù)量有限制。這點(diǎn)最容易被忽略,如果我們同時(shí)有過多的請求,則會(huì)導(dǎo)致接口被封禁。這里可以通過信號量來控制同時(shí)執(zhí)行的線程數(shù)量。
為了盡快完成數(shù)據(jù)同步,根據(jù)實(shí)際情況:整個(gè)數(shù)據(jù)同步可分為讀數(shù)據(jù)和寫數(shù)據(jù)兩個(gè)部分。讀數(shù)據(jù)是通過 API 獲取,走網(wǎng)絡(luò) IO,速度較慢;寫數(shù)據(jù)是寫到數(shù)據(jù)庫,速度較快。所以得出結(jié)論:需要分配較多的線程進(jìn)行讀數(shù)據(jù),較少的線程進(jìn)行寫數(shù)據(jù)。
設(shè)計(jì)要點(diǎn)首先,我們需要確定開啟多少個(gè)線程(在生產(chǎn)中往往是使用線程池),線程數(shù)量需要根據(jù)服務(wù)器性能來決定,這里我們定為 40 個(gè)讀取數(shù)據(jù)線程(將 1000 個(gè)公眾號分為 40 份,分別在 40 個(gè)線程中執(zhí)行),1個(gè)寫入數(shù)據(jù)線程。(具體開多少個(gè)線程,取決于線程池的容量,以及可以分配給此業(yè)務(wù)的數(shù)量。具體的數(shù)字需要根據(jù)實(shí)際情況測試得出,比服務(wù)器閾值低一些較好。當(dāng)然,配置允許范圍內(nèi)越大越好)
其次,考慮到微信對于 API 并發(fā)請求的限制,需要限制同時(shí)執(zhí)行的線程數(shù),使用java.util.concurrent.Semaphore進(jìn)行控制,這里我們限制為 20 個(gè)(具體的信號量憑證數(shù),取決于同一時(shí)間能夠執(zhí)行的線程,跟 API 限制,服務(wù)器性能有關(guān))。
然后,我們需要知道數(shù)據(jù)何時(shí)讀取、寫入完畢,以控制程序邏輯以及終止程序,這里我們使用java.util.concurrent.CountDownLatch進(jìn)行控制。
最后,我們需要一個(gè)數(shù)據(jù)結(jié)構(gòu),用來在多個(gè)線程中共享處理的數(shù)據(jù),此處同步數(shù)據(jù)的場景非常適合使用隊(duì)列,這里我們使用線程安全的java.util.concurrent.ConcurrentLinkedQueue來進(jìn)行處理。(需要注意的是,在實(shí)際開發(fā)中,隊(duì)列不能夠無限制地增長,這將會(huì)很快消耗掉內(nèi)存,我們需要根據(jù)實(shí)際情況對隊(duì)列長度做控制。例如,可以通過控制讀取線程數(shù)和寫入線程數(shù)的比例來控制隊(duì)列的長度)
模擬代碼由于本文重點(diǎn)關(guān)注多線程的使用,模擬代碼只體現(xiàn)多線程操作的方法。代碼里添加了大量的注釋,方便各位讀者閱讀理解。
JDK:1.8
import java.util.Arrays; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * N個(gè)線程向隊(duì)列添加數(shù)據(jù) * 一個(gè)線程消費(fèi)隊(duì)列數(shù)據(jù) */ public class QueueTest { private static Listdata = Arrays.asList("a", "b", "c", "d", "e"); private static final int OFFER_COUNT = 40; // 開啟的線程數(shù)量 private static Semaphore semaphore = new Semaphore(20); // 同一時(shí)間執(zhí)行的線程數(shù)量(大多用于控制API調(diào)用次數(shù)或數(shù)據(jù)庫查詢連接數(shù)) public static void main(String[] args) throws InterruptedException { Queue queue = new ConcurrentLinkedQueue<>(); // 處理隊(duì)列,需要處理的數(shù)據(jù),放置到此隊(duì)列中 CountDownLatch offerLatch = new CountDownLatch(OFFER_COUNT); // offer線程latch,每完成一個(gè),latch減一,lacth的count為0時(shí)表示offer處理完畢 CountDownLatch pollLatch = new CountDownLatch(1); // poll線程latch,latch的count為0時(shí),表示poll處理完畢 Runnable offerRunnable = () -> { try { semaphore.acquire(); // 信號量控制 } catch (InterruptedException e) { e.printStackTrace(); } try { for (String datum : data) { queue.offer(datum); TimeUnit.SECONDS.sleep(2); // 模擬取數(shù)據(jù)很慢的情況 } } catch (InterruptedException e) { e.printStackTrace(); } finally { // 在finally中執(zhí)行l(wèi)atch.countDown()以及信號量釋放,避免因異常導(dǎo)致沒有正常釋放 offerLatch.countDown(); semaphore.release(); } }; Runnable pollRunnable = () -> { int count = 0; try { while (offerLatch.getCount() > 0 || queue.size() > 0) { // 只要offer的latch未執(zhí)行完,或queue仍舊有數(shù)據(jù),則繼續(xù)循環(huán) String poll = queue.poll(); if (poll != null) { System.out.println(poll); count++; } // 無論是否poll到數(shù)據(jù),均暫停一小段時(shí)間,可降低CPU消耗 TimeUnit.MILLISECONDS.sleep(100); } System.out.println("total count:" + count); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 在finally中執(zhí)行l(wèi)atch.countDown(),避免因異常導(dǎo)致沒有正常釋放 pollLatch.countDown(); } }; // 啟動(dòng)線程(生產(chǎn)環(huán)境中建議使用線程池) new Thread(pollRunnable).start(); // 啟動(dòng)一個(gè)poll線程 for (int i = 0; i < OFFER_COUNT; i++) { new Thread(offerRunnable).start(); } // 模擬取數(shù)據(jù)很慢,需要開啟40個(gè)線程處理 // latch等待,會(huì)block主線程直到latch的count為0 offerLatch.await(); pollLatch.await(); System.out.println("===the end==="); } }
到這里,本文結(jié)束。以上是筆者腦補(bǔ)的一個(gè)常見需求的解決方案。
注意:多線程編程對實(shí)際環(huán)境和需求有很大的依賴,需要根據(jù)實(shí)際的需求情況對各個(gè)參數(shù)做調(diào)整。實(shí)際在使用中,需要盡量模擬生產(chǎn)環(huán)境的數(shù)據(jù)情況來進(jìn)行測試,對服務(wù)器執(zhí)行期間的并發(fā)數(shù),CPU、內(nèi)存、網(wǎng)絡(luò) IO、磁盤 IO 做好觀察。并適當(dāng)?shù)卣{(diào)低并發(fā)數(shù),以給服務(wù)器留有處理其他請求的余量。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/73249.html
摘要:后端好書閱讀與推薦系列文章后端好書閱讀與推薦后端好書閱讀與推薦續(xù)后端好書閱讀與推薦續(xù)二后端好書閱讀與推薦續(xù)三這里依然記錄一下每本書的亮點(diǎn)與自己讀書心得和體會(huì),分享并求拍磚。然后又請求封鎖,當(dāng)釋放了上的封鎖之后,系統(tǒng)又批準(zhǔn)了的請求一直等待。 后端好書閱讀與推薦系列文章:后端好書閱讀與推薦后端好書閱讀與推薦(續(xù))后端好書閱讀與推薦(續(xù)二)后端好書閱讀與推薦(續(xù)三) 這里依然記錄一下每本書的...
摘要:后端好書閱讀與推薦系列文章后端好書閱讀與推薦后端好書閱讀與推薦續(xù)后端好書閱讀與推薦續(xù)二后端好書閱讀與推薦續(xù)三這里依然記錄一下每本書的亮點(diǎn)與自己讀書心得和體會(huì),分享并求拍磚。然后又請求封鎖,當(dāng)釋放了上的封鎖之后,系統(tǒng)又批準(zhǔn)了的請求一直等待。 后端好書閱讀與推薦系列文章:后端好書閱讀與推薦后端好書閱讀與推薦(續(xù))后端好書閱讀與推薦(續(xù)二)后端好書閱讀與推薦(續(xù)三) 這里依然記錄一下每本書的...
摘要:后端好書閱讀與推薦系列文章后端好書閱讀與推薦后端好書閱讀與推薦續(xù)后端好書閱讀與推薦續(xù)二后端好書閱讀與推薦續(xù)三這里依然記錄一下每本書的亮點(diǎn)與自己讀書心得和體會(huì),分享并求拍磚。然后又請求封鎖,當(dāng)釋放了上的封鎖之后,系統(tǒng)又批準(zhǔn)了的請求一直等待。 后端好書閱讀與推薦系列文章:后端好書閱讀與推薦后端好書閱讀與推薦(續(xù))后端好書閱讀與推薦(續(xù)二)后端好書閱讀與推薦(續(xù)三) 這里依然記錄一下每本書的...
摘要:相比與其他操作系統(tǒng)包括其他類系統(tǒng)有很多的優(yōu)點(diǎn),其中有一項(xiàng)就是,其上下文切換和模式切換的時(shí)間消耗非常少。因?yàn)槎嗑€程競爭鎖時(shí)會(huì)引起上下文切換。減少線程的使用。很多編程語言中都有協(xié)程。所以如何避免死鎖的產(chǎn)生,在我們使用并發(fā)編程時(shí)至關(guān)重要。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Java多線程學(xué)習(xí)(二)synchronized關(guān)鍵字(1) java多線程學(xué)習(xí)(二)syn...
閱讀 1632·2021-11-22 13:53
閱讀 2874·2021-11-15 18:10
閱讀 2776·2021-09-23 11:21
閱讀 2518·2019-08-30 15:55
閱讀 492·2019-08-30 13:02
閱讀 769·2019-08-29 17:22
閱讀 1714·2019-08-29 13:56
閱讀 3467·2019-08-29 11:31