摘要:包主要實(shí)現(xiàn)類,這是一個(gè)抽象類,實(shí)現(xiàn)了通用的模板方法,并在方法內(nèi)部判斷錯(cuò)誤重試去重處理等。重置重復(fù)檢查就是清空,獲取請(qǐng)求總數(shù)也就是獲取的。至于請(qǐng)求總數(shù)統(tǒng)計(jì),就是返回中維護(hù)的的大小。
Scheduler是Webmagic中的url調(diào)度器,負(fù)責(zé)從Spider處理收集(push)需要抓取的url(Page的targetRequests)、并poll出將要被處理的url給Spider,同時(shí)還負(fù)責(zé)對(duì)url判斷是否進(jìn)行錯(cuò)誤重試、及去重處理、以及總頁(yè)面數(shù)、剩余頁(yè)面數(shù)統(tǒng)計(jì)等。
主要接口:
Scheduler,定義了基本的push和poll方法。基本接口。
MonitorableScheduler,繼承自Scheduler的接口,定義了獲取剩余url請(qǐng)求數(shù)和總請(qǐng)求數(shù)的方法。便于監(jiān)控。
core包主要實(shí)現(xiàn)類:
DuplicateRemovedScheduler,這是一個(gè)抽象類,實(shí)現(xiàn)了通用的push模板方法,并在push方法內(nèi)部判斷錯(cuò)誤重試、去重處理等。去重策略采用的是HashSetDuplicateRemover類,這個(gè)會(huì)在稍后說(shuō)明。
PriorityScheduler,內(nèi)置兩個(gè)優(yōu)先級(jí)隊(duì)列(+,-)和一個(gè)非優(yōu)先級(jí)阻塞隊(duì)列的調(diào)度器。
QueueScheduler,內(nèi)置一個(gè)阻塞隊(duì)列的調(diào)度器。這是默認(rèn)采用的。
URL去重策略:
DuplicateRemover:去重接口,含有判斷是否重復(fù),重置重復(fù)檢查,獲取請(qǐng)求總數(shù)的方法。
HashSetDuplicateRemover:DuplicateRemover的實(shí)現(xiàn)類,內(nèi)部維護(hù)了一個(gè)并發(fā)安全的HashSet。
先說(shuō)下去重策略的具體實(shí)現(xiàn)。核心代碼如下:
public class HashSetDuplicateRemover implements DuplicateRemover { private Seturls = Collections.newSetFromMap(new ConcurrentHashMap ()); @Override public boolean isDuplicate(Request request, Task task) { return !urls.add(getUrl(request)); } 。。。 @Override public void resetDuplicateCheck(Task task) { urls.clear(); } @Override public int getTotalRequestsCount(Task task) { return urls.size(); } }
去重策略類很簡(jiǎn)單,就是維護(hù)一個(gè)并發(fā)安全的HashSet。然后通過(guò)add方法是否成功來(lái)判斷是否是重復(fù)的url。重置重復(fù)檢查就是清空set,獲取請(qǐng)求總數(shù)也就是獲取set的size。簡(jiǎn)單明了。但是你以為去重就這么點(diǎn),那么你錯(cuò)了。繼續(xù)看。
public abstract class DuplicateRemovedScheduler implements Scheduler { private DuplicateRemover duplicatedRemover = new HashSetDuplicateRemover(); @Override public void push(Request request, Task task) { if (shouldReserved(request) || noNeedToRemoveDuplicate(request) || !duplicatedRemover.isDuplicate(request, task)) { pushWhenNoDuplicate(request, task); } } protected boolean shouldReserved(Request request) { return request.getExtra(Request.CYCLE_TRIED_TIMES) != null; } protected boolean noNeedToRemoveDuplicate(Request request) { return HttpConstant.Method.POST.equalsIgnoreCase(request.getMethod()); } protected void pushWhenNoDuplicate(Request request, Task task) { } }
DuplicateRemovedScheduler是一個(gè)抽象類,提供了push的通用模板,并為子類提供了pushWhenNoDuplicate用于實(shí)現(xiàn)自己的策略。push方法用于同一處理去重和重試機(jī)制。
首先判斷是否需要進(jìn)行錯(cuò)誤重試,如果需要,那么就直接push到隊(duì)列中,否則判斷請(qǐng)求是否為POST方法,如果是直接加入隊(duì)列,(這里需要注意的是,POST請(qǐng)求的url不會(huì)被加入HashSetDuplicateRemover維護(hù)的urls集合,故而也不會(huì)被加入到最終的getTotalRequestsCount的統(tǒng)計(jì)中,所以最終我們獲取的統(tǒng)計(jì)信息只是針對(duì)GET請(qǐng)求的。),否則進(jìn)行去重判斷。
根據(jù)不同調(diào)度器的實(shí)現(xiàn),pushWhenNoDuplicate的實(shí)現(xiàn)方式不一樣。
在PriorityScheduler中內(nèi)置兩個(gè)優(yōu)先級(jí)隊(duì)列(+,-)和一個(gè)非優(yōu)先級(jí)阻塞隊(duì)列的調(diào)度器,其pushWhenNoDuplicate代碼如下:
public void pushWhenNoDuplicate(Request request, Task task) { if (request.getPriority() == 0) { noPriorityQueue.add(request); } else if (request.getPriority() > 0) { priorityQueuePlus.put(request); } else { priorityQueueMinus.put(request); } }
根據(jù)Request是否設(shè)置priority屬性,以及是否為正、負(fù)來(lái)決定加入到哪個(gè)隊(duì)列中。因?yàn)檫@影響了后續(xù)poll的先后順序。
在QueueScheduler中內(nèi)置一個(gè)阻塞隊(duì)列的調(diào)度器。其pushWhenNoDuplicate代碼如下:
public void pushWhenNoDuplicate(Request request, Task task) { queue.add(request); }
就是簡(jiǎn)單地將其加入隊(duì)列中。
以上就是關(guān)于URL去重及push的機(jī)制,接下來(lái)說(shuō)明poll思路:
在PriorityScheduler中,poll順序?yàn)閜lus隊(duì)列>noPriority隊(duì)列>minus隊(duì)列。
public synchronized Request poll(Task task) { Request poll = priorityQueuePlus.poll(); if (poll != null) { return poll; } poll = noPriorityQueue.poll(); if (poll != null) { return poll; } return priorityQueueMinus.poll(); }
在QueueScheduler中,簡(jiǎn)單粗暴。
public Request poll(Task task) { return queue.poll(); }
至于url請(qǐng)求總數(shù)統(tǒng)計(jì),就是返回HashSetDuplicateRemover中維護(hù)的urls set的大小。這里再次羅嗦一次:最終我們獲取的統(tǒng)計(jì)信息只是針對(duì)GET請(qǐng)求的。
public int getTotalRequestsCount(Task task) { return getDuplicateRemover().getTotalRequestsCount(task); }
當(dāng)然extensions擴(kuò)展模塊中還有些Scheduler實(shí)現(xiàn),比如RedisScheduler用作集群支持,F(xiàn)ileCacheQueueScheduler用來(lái)斷點(diǎn)續(xù)爬支持等。由于本系列文章是先分析核心包,后續(xù)分析擴(kuò)展包,所以關(guān)于這部分,后續(xù)補(bǔ)充。
RedisScheduler
思路是采用set來(lái)存儲(chǔ)已經(jīng)抓取過(guò)的url,list來(lái)存儲(chǔ)待抓url隊(duì)列,hash來(lái)存儲(chǔ)序列化數(shù)據(jù)(哈希中的鍵為url的SHA值,值為Request的json序列化字符串)。所有數(shù)據(jù)類型的鍵都是基于Spider的UUID來(lái)生成的,也就是說(shuō)每個(gè)Spider實(shí)例所擁有的都是不同的。
@Override public boolean isDuplicate(Request request, Task task) { Jedis jedis = pool.getResource(); try { return jedis.sadd(getSetKey(task), request.getUrl()) > 0; } finally { pool.returnResource(jedis); } } @Override protected void pushWhenNoDuplicate(Request request, Task task) { Jedis jedis = pool.getResource(); try { jedis.rpush(getQueueKey(task), request.getUrl()); if (request.getExtras() != null) { String field = DigestUtils.shaHex(request.getUrl()); String value = JSON.toJSONString(request); jedis.hset((ITEM_PREFIX + task.getUUID()), field, value); } } finally { pool.returnResource(jedis); } } @Override public synchronized Request poll(Task task) { Jedis jedis = pool.getResource(); try { String url = jedis.lpop(getQueueKey(task)); if (url == null) { return null; } String key = ITEM_PREFIX + task.getUUID(); String field = DigestUtils.shaHex(url); byte[] bytes = jedis.hget(key.getBytes(), field.getBytes()); if (bytes != null) { Request o = JSON.parseObject(new String(bytes), Request.class); return o; } Request request = new Request(url); return request; } finally { pool.returnResource(jedis); } }
@Override public int getLeftRequestsCount(Task task) { Jedis jedis = pool.getResource(); try { Long size = jedis.llen(getQueueKey(task)); return size.intValue(); } finally { pool.returnResource(jedis); } } @Override public int getTotalRequestsCount(Task task) { Jedis jedis = pool.getResource(); try { Long size = jedis.scard(getSetKey(task)); return size.intValue(); } finally { pool.returnResource(jedis); } }
這些代碼都很好理解,只要有點(diǎn)redis基礎(chǔ)的都沒(méi)問(wèn)題,這里就不再贅述了。
至于RedisPriorityScheduler就是采用有序的zset來(lái)存儲(chǔ)plus、min隊(duì)列,list來(lái)存儲(chǔ)noprioprity隊(duì)列。
FileCacheQueueScheduler
思路是維護(hù)兩個(gè)文件.cursor.txt,.urls.txt 前者由于存儲(chǔ)一個(gè)數(shù)字,這個(gè)數(shù)字代表了讀取.urls.txt的行數(shù)。后者用來(lái)存儲(chǔ)所有的urls。初始化時(shí)從兩個(gè)文件讀取內(nèi)存中,并初始化urls集和queue隊(duì)列、同時(shí)初始化flush線程定時(shí)flush內(nèi)容到文件中。當(dāng)poll和pushWhenNoDuplicate時(shí)和原來(lái)邏輯差不多,只不過(guò)加了寫文件的步驟。
需要注意的是:FileCacheQueueScheduler實(shí)現(xiàn)了自己的去重規(guī)則,而不是直接使用DuplicateRemovedScheduler父類的去重規(guī)則。不過(guò)原理都一樣,都是通過(guò)Set來(lái)去重。
以上就是關(guān)于調(diào)度器的部分,下篇主題待定。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/66885.html
摘要:獲取正在運(yùn)行的線程數(shù),用于狀態(tài)監(jiān)控。之后初始化組件主要是初始化線程池將到中,初始化開(kāi)始時(shí)間等。如果線程池中運(yùn)行線程數(shù)量為,并且默認(rèn),那么就停止退出,結(jié)束爬蟲。 本系列文章,針對(duì)Webmagic 0.6.1版本 一個(gè)普通爬蟲啟動(dòng)代碼 public static void main(String[] args) { Spider.create(new GithubRepoPageP...
摘要:爬蟲框架源碼分析之爬蟲框架源碼分析之爬蟲框架源碼分析之爬蟲框架源碼分析之爬蟲框架源碼分析之之進(jìn)階 爬蟲框架Webmagic源碼分析之Spider爬蟲框架WebMagic源碼分析之Scheduler爬蟲框架WebMagic源碼分析之Downloader爬蟲框架WebMagic源碼分析之Selector爬蟲框架WebMagic源碼分析之SeleniumWebMagic之Spider進(jìn)階
摘要:實(shí)際運(yùn)行中就發(fā)現(xiàn)了一個(gè)有趣的現(xiàn)象。爬蟲抓取的速度超過(guò)了我用給它推送的速度,導(dǎo)致爬蟲從獲取不到同時(shí)此刻線程池所有線程都已停止。如何管理設(shè)置,避免返回,且沒(méi)有工作線程時(shí)退出循環(huán)。退出檢測(cè)循環(huán)說(shuō)明結(jié)束了,手動(dòng)調(diào)用來(lái)是退出調(diào)度循環(huán),終止爬蟲。 Webmagic源碼分析系列文章,請(qǐng)看這里 從解決問(wèn)題開(kāi)始吧。 問(wèn)題描述:由于數(shù)據(jù)庫(kù)的數(shù)據(jù)量特別大,而且公司沒(méi)有搞主從讀寫分離,導(dǎo)致從數(shù)據(jù)庫(kù)讀取數(shù)據(jù)比較...
摘要:主要用于選擇器抽象類,實(shí)現(xiàn)類前面說(shuō)的兩個(gè)接口,主要用于選擇器繼承。多個(gè)選擇的情形,每個(gè)選擇器各自獨(dú)立選擇,將所有結(jié)果合并。抽象類,定義了一些模板方法。這部分源碼就不做分析了。這里需要提到的一點(diǎn)是返回的不支持選擇,返回的對(duì)象支持選擇。 1、Selector部分:接口:Selector:定義了根據(jù)字符串選擇單個(gè)元素和選擇多個(gè)元素的方法。ElementSelector:定義了根據(jù)jsoup ...
摘要:有一個(gè)模塊其中實(shí)現(xiàn)了一個(gè)。但是感覺(jué)靈活性不大。接口如下它會(huì)獲得一個(gè)實(shí)例,你可以在里面進(jìn)行任意的操作。本部分到此結(jié)束。 webmagic有一個(gè)selenium模塊,其中實(shí)現(xiàn)了一個(gè)SeleniumDownloader。但是感覺(jué)靈活性不大。所以我就自己參考實(shí)現(xiàn)了一個(gè)。 首先是WebDriverPool用來(lái)管理WebDriver池: import java.util.ArrayList; im...
閱讀 2096·2023-04-25 15:24
閱讀 1606·2019-08-30 12:55
閱讀 1641·2019-08-29 15:27
閱讀 497·2019-08-26 17:04
閱讀 2445·2019-08-26 10:59
閱讀 1828·2019-08-26 10:44
閱讀 2230·2019-08-22 16:15
閱讀 2614·2019-08-22 15:36