摘要:獲取正在運行的線程數(shù),用于狀態(tài)監(jiān)控。之后初始化組件主要是初始化線程池將到中,初始化開始時間等。如果線程池中運行線程數(shù)量為,并且默認,那么就停止退出,結束爬蟲。
本系列文章,針對Webmagic 0.6.1版本
一個普通爬蟲啟動代碼
public static void main(String[] args) { Spider.create(new GithubRepoPageProcessor()) 從https:github.com/code4craft開始抓 .addUrl("https://github.com/code4craft") //設置Scheduler,使用Redis來管理URL隊列 .setScheduler(new RedisScheduler("localhost")) //設置Pipeline,將結果以json方式保存到文件 .addPipeline(new JsonFilePipeline("D:datawebmagic")) //開啟5個線程同時執(zhí)行 .thread(5) //啟動爬蟲 .run(); }
1、spider可配置插拔組件:
Downloader 提供自定義的Downloader,默認為HttpClientDownloader
Pipeline 提供自定義的Pipeline,可以配置多個,多個Pipeline鏈式處理結果。默認為ConsolePipeline
Scheduler 提供自定義的調(diào)度器,默認為QueueScheduler
PageProcessor 頁面處理組件,開發(fā)者爬蟲的實現(xiàn)
ExecutorService 可以用于提供自己實現(xiàn)的線程池來監(jiān)控,默認為Fixed ExecutorService
SpiderListener 頁面狀態(tài)監(jiān)聽器,提供每個頁面成功和錯誤的回調(diào)??膳渲枚鄠€。
其中有:WebMagic四大組件:Pipeline,Scheduler,Downloader和PageProcesser 。這和Python中的Scrapy的理念是一致的。但是Scrapy還有一些中間件的概念,從結構圖中便可以看出區(qū)別
2、狀態(tài)變量:
stat 0,初始化;1,運行中;2,已停止
pageCount 已經(jīng)抓取的頁面數(shù)。注意:這里統(tǒng)計的是GET請求的頁面,POST請求的頁面不在統(tǒng)計的范圍之內(nèi)。具體原因見DuplicateRemovedScheduler類
startTime:開始時間,可用于計算耗時。
emptySleepTime 最大空閑等待時間,默認30s。如果抓取隊列為空,且url隊列為空的最大等待時長,超過該時間,就認為爬蟲抓取完成,停止運行。
threadNum : 啟用的線程數(shù),默認1.
threadPool:這是Webmagic提供的CountableThreadPool實例,內(nèi)部封裝了ExecutorService,CountableThreadPool 提供了額外的獲取線程運行數(shù)的方法,此外為防止大量urls入池等待,提供了阻塞方式管理urls入池。(后續(xù)細說)
destroyWhenExit:默認true。是否在調(diào)用stop()時立即停止所有任務并退出。
spawUrl : 默認為true,是否抓取除了入口頁面starturls之外的其他頁面(targetRequests).
3、需要配置的項:
Site 全局站點配置,如UA,timeout,sleep等
PageProcessor 頁面處理組件,開發(fā)者爬蟲的實現(xiàn)
Request 配置入口頁面url,可以多個。
uuid ,可選,Spider的名字,用于分析和日志。
需要注意的是:每個修改配置的方法都進行了checkIfRunning檢查,如果檢查當前Spider正在運行,它會拋出IllegalStateException。
所有配置方法都return this,便于鏈式調(diào)用,類似于builder模式。
4、運行方式:
Spider實現(xiàn)了Runnable接口(還有一個Webmagic自己的Task接口)。
run(),跟普通的Runnable一樣,阻塞式運行,會阻塞當前線程直至Spider運行結束。
runAsync(),就是new一個Thread來運行當前Spider這個Runnable,異步運行。
start(),runAsync()的別名方法,異步運行。
5、狀態(tài)相關方法
stop(),結束當前爬蟲的運行,內(nèi)部只是簡單地修改一下狀態(tài),如果設置了destroyWhenExit=true(默認就是true)那么會立即停止所有任務并清除資源,否則并不會停止正在線程池中運行的線程,也不會銷毀線程池。
getThreadAlive() 獲取正在運行的線程數(shù),用于狀態(tài)監(jiān)控。
6、核心代碼分析
public void run() { checkRunningStat(); initComponent(); logger.info("Spider " + getUUID() + " started!"); while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) { final Request request = scheduler.poll(this); if (request == null) { if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { break; } // wait until new url added waitNewUrl(); } else { threadPool.execute(new Runnable() { @Override public void run() { try { processRequest(request); onSuccess(request); } catch (Exception e) { onError(request); logger.error("process request " + request + " error", e); } finally { pageCount.incrementAndGet(); signalNewUrl(); } } }); } } stat.set(STAT_STOPPED); // release some resources if (destroyWhenExit) { close(); } }
首先通過checkRunningStat()來檢查并設置運行狀態(tài),如果已經(jīng)在運行了,那么會拋出IllegalStateException。之后初始化組件(主要是初始化Downloader、線程池、將starturls push到Scheduler中,初始化開始時間等)。之后進入循環(huán),從scheduler中poll出Request給線程池去執(zhí)行。如果scheduler中沒有request了:繼而判斷是否有線程在運行和是否設置了立即退出標志,如果設置了立即退出循環(huán),否則調(diào)用waitNewUrl()等待有新的url被加入。
waitNewUrl()采用RetreentLock和Condition來進行超時阻塞,一旦阻塞時間超過emptySleepTime就返回。如果線程池中運行線程數(shù)量為0,并且exitWhenComplete=true(默認),那么就停止退出,結束爬蟲。如果exitWhenComplete=false,那么需要開發(fā)者手動調(diào)用stop()來停止退出爬蟲,并調(diào)用close()來清理資源。
通過processRequest來處理抓取url的整個流程,代碼如下:
protected void processRequest(Request request) { Page page = downloader.download(request, this); if (page == null) { sleep(site.getSleepTime()); onError(request); return; } // for cycle retry if (page.isNeedCycleRetry()) { extractAndAddRequests(page, true); sleep(site.getRetrySleepTime()); return; } pageProcessor.process(page); extractAndAddRequests(page, spawnUrl); if (!page.getResultItems().isSkip()) { for (Pipeline pipeline : pipelines) { pipeline.process(page.getResultItems(), this); } } //for proxy status management request.putExtra(Request.STATUS_CODE, page.getStatusCode()); sleep(site.getSleepTime()); }
它在內(nèi)部調(diào)用downloader下載頁面得到Page(Page代表了一個頁面),然后判斷是否需要重試(needCycleRetry標志會在downloader下載頁面發(fā)生異常時被設置為true,同時會把自己本身request加到targetRequests當中),如果需要,則抽取targetRequests到scheduler當中。如果都沒問題,繼續(xù)調(diào)用我們實現(xiàn)的頁面處理器進行處理,之后再抽取我們在頁面處理器中放入的targetRequests(即需要繼續(xù)抓取的url)到scheduler當中。之后便是調(diào)用pipeline進行處理(一般做持久化操作,寫到數(shù)據(jù)庫、文件之類的),但是如果我們在頁面處理器中為page設置了skip標志,那么就不會調(diào)用pipeline進行處理。
當然其中還包括一些重試休眠時間、繼續(xù)抓取等待時間等來更好地控制爬蟲抓取頻率。
說完processRequest,我們回到run()繼續(xù)分析,處理完之后,就是調(diào)用監(jiān)聽器,告訴其成功還是失敗,最后抓取數(shù)加+1,然后通知新url被加入(通知waitNewUrl()可以返回繼續(xù)了)。
需要說明的一點是,Spider類中的狀態(tài)管理大量用到了Jdk Atomic原子包下的CAS并發(fā)原子類。
7、CountableThreadPool
前面說過Spider采用的線程池對象CountableThreadPool內(nèi)部封裝了ExecutorService,CountableThreadPool 提供了額外的獲取線程運行數(shù)的方法,此外為防止大量urls入池等待,提供了阻塞方式管理urls入池。
阻塞方式的實現(xiàn)是通過ReentrantLock和它的Condition來實現(xiàn)的。具體代碼如下:
public void execute(final Runnable runnable) { if (threadAlive.get() >= threadNum) { try { reentrantLock.lock(); while (threadAlive.get() >= threadNum) { try { condition.await(); } catch (InterruptedException e) { } } } finally { reentrantLock.unlock(); } } threadAlive.incrementAndGet(); executorService.execute(new Runnable() { @Override public void run() { try { runnable.run(); } finally { try { reentrantLock.lock(); threadAlive.decrementAndGet(); condition.signal(); } finally { reentrantLock.unlock(); } } } }); }
邏輯是這樣的,如果正在運行的線程數(shù)threadAlive超過允許的線程數(shù),就阻塞等待,直至收到某個線程結束通知。
羅嗦一句,這里的線程安全控制,主要是用到了JDK atomic包來表示狀態(tài)和ReentrantLock、Condition來控制達到類似生產(chǎn)者消費者的阻塞機制。
關于Spider就分析到這里,后續(xù)主題待定。
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://systransis.cn/yun/66862.html
摘要:爬蟲框架源碼分析之爬蟲框架源碼分析之爬蟲框架源碼分析之爬蟲框架源碼分析之爬蟲框架源碼分析之之進階 爬蟲框架Webmagic源碼分析之Spider爬蟲框架WebMagic源碼分析之Scheduler爬蟲框架WebMagic源碼分析之Downloader爬蟲框架WebMagic源碼分析之Selector爬蟲框架WebMagic源碼分析之SeleniumWebMagic之Spider進階
摘要:包主要實現(xiàn)類,這是一個抽象類,實現(xiàn)了通用的模板方法,并在方法內(nèi)部判斷錯誤重試去重處理等。重置重復檢查就是清空,獲取請求總數(shù)也就是獲取的。至于請求總數(shù)統(tǒng)計,就是返回中維護的的大小。 Scheduler是Webmagic中的url調(diào)度器,負責從Spider處理收集(push)需要抓取的url(Page的targetRequests)、并poll出將要被處理的url給Spider,同時還負責...
摘要:方法,首先判斷是否有這是在中配置的,如果有,直接調(diào)用的將相應內(nèi)容轉化成對應編碼字符串,否則智能檢測響應內(nèi)容的字符編碼。 Downloader是負責請求url獲取返回值(html、json、jsonp等)的一個組件。當然會同時處理POST重定向、Https驗證、ip代理、判斷失敗重試等。 接口:Downloader 定義了download方法返回Page,定義了setThread方法來...
摘要:實際運行中就發(fā)現(xiàn)了一個有趣的現(xiàn)象。爬蟲抓取的速度超過了我用給它推送的速度,導致爬蟲從獲取不到同時此刻線程池所有線程都已停止。如何管理設置,避免返回,且沒有工作線程時退出循環(huán)。退出檢測循環(huán)說明結束了,手動調(diào)用來是退出調(diào)度循環(huán),終止爬蟲。 Webmagic源碼分析系列文章,請看這里 從解決問題開始吧。 問題描述:由于數(shù)據(jù)庫的數(shù)據(jù)量特別大,而且公司沒有搞主從讀寫分離,導致從數(shù)據(jù)庫讀取數(shù)據(jù)比較...
摘要:優(yōu)雅的使用框架,爬取唐詩別苑網(wǎng)的詩人詩歌數(shù)據(jù)同時在幾種動態(tài)加載技術中對比作選擇雖然差不多兩年沒有維護,但其本身是一個優(yōu)秀的爬蟲框架的實現(xiàn),源碼中有很多值得參考的地方,特別是對爬蟲多線程的控制。 優(yōu)雅的使用WebMagic框架,爬取唐詩別苑網(wǎng)的詩人詩歌數(shù)據(jù) 同時在幾種動態(tài)加載技術(HtmlUnit、PhantomJS、Selenium、JavaScriptEngine)中對比作選擇 We...
閱讀 3213·2021-11-08 13:18
閱讀 1366·2021-10-09 09:57
閱讀 1198·2021-09-22 15:33
閱讀 3997·2021-08-17 10:12
閱讀 5079·2021-08-16 11:02
閱讀 2693·2019-08-30 10:56
閱讀 975·2019-08-29 18:31
閱讀 3263·2019-08-29 16:30