成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

MaxCompute讀取分析OSS非結(jié)構(gòu)化數(shù)據(jù)的實(shí)踐經(jīng)驗(yàn)總結(jié)

robin / 1687人閱讀

摘要:本文對使用分析文本數(shù)據(jù)的實(shí)踐過程中遇到的一些問題和優(yōu)化經(jīng)驗(yàn)進(jìn)行了總結(jié)。作為前提,讀者需要詳細(xì)了解讀取文本數(shù)據(jù)的一些基礎(chǔ)知識,對這篇官方文檔訪問非結(jié)構(gòu)化數(shù)據(jù)最好有過實(shí)踐經(jīng)驗(yàn)。這需要針對含有非英文字符的文本數(shù)據(jù)做一些特殊處理。

摘要: 1. 本文背景 很多行業(yè)的信息系統(tǒng)中,例如金融行業(yè)的信息系統(tǒng),相當(dāng)多的數(shù)據(jù)交互工作是通過傳統(tǒng)的文本文件進(jìn)行交互的。此外,很多系統(tǒng)的業(yè)務(wù)日志和系統(tǒng)日志由于各種原因并沒有進(jìn)入ELK之類的日志分析系統(tǒng),也是以文本文件的形式存在的。

本文背景

很多行業(yè)的信息系統(tǒng)中,例如金融行業(yè)的信息系統(tǒng),相當(dāng)多的數(shù)據(jù)交互工作是通過傳統(tǒng)的文本文件進(jìn)行交互的。此外,很多系統(tǒng)的業(yè)務(wù)日志和系統(tǒng)日志由于各種原因并沒有進(jìn)入ELK之類的日志分析系統(tǒng),也是以文本文件的形式存在的。隨著數(shù)據(jù)量的指數(shù)級增長,對超大文本文件的分析越來越成為挑戰(zhàn)。好在阿里云的MaxCompute產(chǎn)品從2.0版本開始正式支持了直接讀取并分析存儲在OSS上的文本文件,可以用結(jié)構(gòu)化查詢的方式去分析非結(jié)構(gòu)化的數(shù)據(jù)。

本文對使用MaxCompute分析OSS文本數(shù)據(jù)的實(shí)踐過程中遇到的一些問題和優(yōu)化經(jīng)驗(yàn)進(jìn)行了總結(jié)。作為前提,讀者需要詳細(xì)了解MaxCompute讀取OSS文本數(shù)據(jù)的一些基礎(chǔ)知識,對這篇官方文檔 《訪問 OSS 非結(jié)構(gòu)化數(shù)據(jù)》最好有過實(shí)踐經(jīng)驗(yàn)。本文所描述的內(nèi)容主要是針對這個(gè)文檔中提到的自定義Extractor做出的一些適配和優(yōu)化。

場景實(shí)踐

2.1 場景一:分析zip壓縮后的文本文件
場景說明
很多時(shí)候我們會對歷史的文本數(shù)據(jù)進(jìn)行壓縮,然后上傳到OSS上進(jìn)行歸檔,那么如果要對這部分?jǐn)?shù)據(jù)導(dǎo)入MaxCompute進(jìn)行離線分析,我們可以自定義Extractor讓MaxCompute直接讀取OSS上的歸檔文件,避免了把歸檔文件下載到本地、解壓縮、再上傳回OSS這樣冗長的鏈路。

實(shí)現(xiàn)思路
如 《訪問 OSS 非結(jié)構(gòu)化數(shù)據(jù)》文檔中所述,MaxCompute讀取OSS上的文本數(shù)據(jù)本質(zhì)上是讀取一個(gè)InputStream流,那么我們只要構(gòu)造出適當(dāng)?shù)臍w檔字節(jié)流,就可以直接獲取這個(gè)InputStream中的數(shù)據(jù)了。

以Zip格式的歸檔文件為例,我們可以參考 DataX 中關(guān)于讀取OSS上Zip文件的源碼,構(gòu)造一個(gè)Zip格式的InputStream,代碼見 ZipCycleInputStream.java 。構(gòu)造出這個(gè)Zip格式的InputStream后,在自定義Extractor中獲取文件流的部分就可以直接使用了,例如:

 private BufferedReader moveToNextStream() throws IOException {
        SourceInputStream stream = inputs.next();
        // ......
        ZipCycleInputStream zipCycleInputStream = new ZipCycleInputStream(stream);
        return new BufferedReader(new InputStreamReader(zipCycleInputStream, "UTF-8"), 8192);
        // ......
     }

優(yōu)化經(jīng)驗(yàn)
大家可能知道,MaxCompute中進(jìn)行批量計(jì)算的時(shí)候,可以通過設(shè)置 odps.stage.mapper.split.size 這個(gè)參數(shù)來調(diào)整數(shù)據(jù)分片的大小,從而影響到執(zhí)行計(jì)算任務(wù)的Mapper的個(gè)數(shù),在一定程度上提高M(jìn)apper的個(gè)數(shù)可以增加計(jì)算的并行度,進(jìn)而提高計(jì)算效率 (但也不是說Mapper個(gè)數(shù)越多越好,因?yàn)檫@樣可能會造成較長時(shí)間的資源等待,或者可能會造成長尾的后續(xù)Reducer任務(wù),反而降低整體的計(jì)算效率) 。

同樣道理,對OSS上的文本文件進(jìn)行解析的時(shí)候,也可以通過設(shè)置 odps.sql.unstructured.data.split.size 這個(gè)參數(shù)來達(dá)到調(diào)整Mapper個(gè)數(shù)的目的 (注意這個(gè)參數(shù)可能需要提工單開通使用權(quán)限):

set odps.sql.unstructured.data.split.size=16;

上述設(shè)定的含義是,將OSS上的文件拆分為若干個(gè)16M左右大小的分片,讓MaxCompute盡力做到每個(gè)分片啟動(dòng)一個(gè)Mapper任務(wù)進(jìn)行計(jì)算——之所以說是“盡力做到”,是因?yàn)镸axCompute默認(rèn)不會對單個(gè)文件進(jìn)行拆分及分片處理(除非設(shè)定了其他參數(shù),我們后面會講到),也就是說,如果把單個(gè)分片按照上面的設(shè)定為16M,而OSS上某個(gè)文件大小假設(shè)為32M,則MaxCompute仍然會把這個(gè)文件整體(即32M)的數(shù)據(jù)量作為一個(gè)分片進(jìn)行Mapper任務(wù)計(jì)算。

注意點(diǎn)
我們在這個(gè)場景中處理的是壓縮后的文件,而InputStream處理的字節(jié)量大小是不會因壓縮而變小的。舉個(gè)例子,假設(shè)壓縮比為1:10,則上述這個(gè)32M的壓縮文件實(shí)際代表了320M的數(shù)據(jù)量,即MaxCompute會把1個(gè)Mapper任務(wù)分配給這320M的數(shù)據(jù)量進(jìn)行處理;同理假設(shè)壓縮比為1:20,則MaxCompute會把1個(gè)Mapper任務(wù)分配給640M的數(shù)據(jù)量進(jìn)行處理,這樣就會較大的影響計(jì)算效率。因此,我們需要根據(jù)實(shí)際情況調(diào)整分片參數(shù)的大小,并盡量把OSS上的壓縮文件大小控制在一個(gè)比較小的范圍內(nèi),從而可以靈活配置分片參數(shù),否則分片參數(shù)的值會因?yàn)槲募蟛⑶椅募粫徊鸱侄А?/p>

2.2 場景二:過濾文本文件中的特定行
場景說明
對于一些業(yè)務(wù)數(shù)據(jù)文件,特別是金融行業(yè)的數(shù)據(jù)交換文件,通常會有文件頭或文件尾的設(shè)定要求,即文件頭部的若干行數(shù)據(jù)是一些元數(shù)據(jù)信息,真正要分析的業(yè)務(wù)數(shù)據(jù)需要把這些元信息的行過濾掉,只分析業(yè)務(wù)數(shù)據(jù)部分的行,否則執(zhí)行結(jié)構(gòu)化查詢的SQL語句的時(shí)候必然會造成任務(wù)失敗。

實(shí)現(xiàn)思路
在 《訪問 OSS 非結(jié)構(gòu)化數(shù)據(jù)》文檔中提到的 代碼示例 中,對 readNextLine() 方法進(jìn)行一些改造,對讀取的每一個(gè)文件,即每個(gè) currentReader 讀取下一行的時(shí)候,記錄下來當(dāng)前處理的行數(shù),用這個(gè)行數(shù)判斷是否到達(dá)了業(yè)務(wù)數(shù)據(jù)行,如果未到業(yè)務(wù)數(shù)據(jù)行,則繼續(xù)讀取下一條記錄,如果已經(jīng)到達(dá)數(shù)據(jù)行,則將該行內(nèi)容返回處理;而當(dāng)跳轉(zhuǎn)到下一個(gè)文件的時(shí)候,將 該行數(shù)值重置。

代碼示例:

 private String readNextLine() throws IOException {
        if (firstRead) {
            firstRead = false;
            currentReader = moveToNextStream();
            if (currentReader == null) {
                return null;
            }
        }
        // 讀取行級數(shù)據(jù)
        while (currentReader != null) {
            String line = currentReader.readLine();
            if (line != null) {
                if (currentLine < dataLineStart) { // 若當(dāng)前行小于數(shù)據(jù)起始行,則繼續(xù)讀取下一條記錄
                    currentLine++;
                    continue;
                }
                if (!"EOF".equals(line)) { // 若未到達(dá)文件尾則將該行內(nèi)容返回,若到達(dá)文件尾則直接跳到下個(gè)文件
                    return line;
                }
            }
            currentReader = moveToNextStream();
            currentLine = 1;
        }
        return null;
    }

此處 dataLineStart 表示業(yè)務(wù)數(shù)據(jù)的起始行,可以通過 DataAttributes 在建立外部表的時(shí)候從外部作為參數(shù)傳入。當(dāng)然也可以隨便定義其他邏輯來過濾掉特定行,比如本例中的對文件尾的“EOF”行進(jìn)行了簡單的丟棄處理。

2.3 場景三:忽略文本中的空行
場景說明
在 《訪問 OSS 非結(jié)構(gòu)化數(shù)據(jù)》文檔中提到的 代碼示例 中,已可以應(yīng)對大多數(shù)場景下的文本數(shù)據(jù)處理,但有時(shí)候在業(yè)務(wù)數(shù)據(jù)文本中會存在一些空行,這些空行可能會造成程序的誤判,因此我們需要忽略掉這些空行,讓程序繼續(xù)分析處理后面有內(nèi)容的行。

實(shí)現(xiàn)思路
類似于上述 場景二 ,只需要判斷為空行后,讓程序繼續(xù)讀取下一行文本即可。
代碼示例:

public Record extract() throws IOException {

String line = readNextLine();
if (line == null) {
    return null;// 返回null標(biāo)志已經(jīng)讀取完成
}
while ("".equals(line.trim()) || line.length() == 0 || line.charAt(0) == "
" // 遇到空行則繼續(xù)處理
        || line.charAt(0) == "
") {
    line = readNextLine();
    if (line == null)
        return null;
}
return textLineToRecord(line);

}

2.4 場景四:選擇OSS上文件夾下的部分文件進(jìn)行處理
場景說明
閱讀 《訪問 OSS 非結(jié)構(gòu)化數(shù)據(jù)》文檔可知,一張MaxCompute的外部表連接的是OSS上的一個(gè)文件夾(嚴(yán)格來說OSS沒有“文件夾”這個(gè)概念,所有對象都是以O(shè)bject來存儲的,所謂的文件夾其實(shí)就是在OSS創(chuàng)建的一個(gè)字節(jié)數(shù)為0且名稱以“/”結(jié)尾的對象。MaxCompute建立外部表時(shí)連接的是OSS上這樣的以“/”結(jié)尾的對象,即連接一個(gè)“文件夾”),在處理外部表時(shí),默認(rèn)會對該文件夾下 所有的文件 進(jìn)行解析處理。該文件夾下所有的文件集合即被封裝為 InputStreamSet ,然后通過其 next() 方法來依次獲得每一個(gè)InputStream流、即每個(gè)文件流。

但有時(shí)我們可能會希望只處理OSS上文件夾下的 部分 文件,而不是全部文件,例如只分析那些文件名中含有“2018_”字樣的文件,表示只分析2018年以來的業(yè)務(wù)數(shù)據(jù)文件。

實(shí)現(xiàn)思路
在獲取到每一個(gè)InputStream的時(shí)候,通過 SourceInputStream 類的 getFileName() 方法獲取正在處理的文件流所代表的文件名,然后可以通過正則表達(dá)式等方式判斷該文件流是否為所需要處理的文件,如果不是則繼續(xù)調(diào)用 next() 方法來獲取下一個(gè)文件流。

代碼示例:

 private BufferedReader moveToNextStream() throws IOException {
        SourceInputStream stream = null;
        while ((stream = inputs.next()) != null) {
            String fileName = stream.getFileName();
            System.out.println("========inputs.next():" + fileName + "========");
            if (patternModel.matcher(fileName).matches()) {
                System.out.println(String
                        .format("- match fileName:[%s], pattern:[%s]", fileName, patternModel
                                .pattern()));
                ZipCycleInputStream zipCycleInputStream = new ZipCycleInputStream(stream);
                return new BufferedReader(new InputStreamReader(zipCycleInputStream, "UTF-8"), 8192);
            } else {
                 System.out.println(String.format(
                         "-- discard fileName:[%s], pattern:[%s]", fileName, patternModel.pattern()));
                continue;
            }
        }
        return null;
    }

本例中的 patternModel 為通過 DataAttributes 在建立外部表的時(shí)候從外部作為參數(shù)傳入的正則規(guī)則。

寫到這里可能有讀者會問,如果一個(gè)文件夾下有很多文件,比如上萬個(gè)文件,整個(gè)遍歷一遍后只選擇一小部分文件進(jìn)行處理這樣的方式會不會效率太低了?其實(shí)大可不必?fù)?dān)心,因?yàn)橄鄬τ贛axCompute對外部表執(zhí)行批量計(jì)算的過程,循環(huán)遍歷文件流的時(shí)間消耗是非常小的,通常情況下是不會影響批量計(jì)算任務(wù)的。

2.5 場景五:針對單個(gè)大文件進(jìn)行拆分
場景說明
在 場景一 中提到,要想提高計(jì)算效率,我們需要調(diào)整 odps.sql.unstructured.data.split.size 參數(shù)值來增加Mapper的并行度,但是對于單個(gè)大文件來講,MaxCompute默認(rèn)是不進(jìn)行拆分的,也就是說OSS上的單個(gè)大文件只會被分配給一個(gè)Mapper任務(wù)進(jìn)行處理,如果這個(gè)文件非常大的話,處理效率將會及其低下,我們需要一種方式來實(shí)現(xiàn)對單個(gè)文件進(jìn)行拆分,使其可以被多個(gè)Mapper任務(wù)進(jìn)行并行處理。

實(shí)現(xiàn)思路
仍然是要依靠調(diào)整 odps.sql.unstructured.data.split.size 參數(shù)來增加Mapper的并行度,并且設(shè)定 odps.sql.unstructured.data.single.file.split.enabled 參數(shù)來允許拆分單個(gè)文件 (同odps.sql.unstructured.data.split.size,該參數(shù)也可能需要提工單申請使用權(quán)限) ,例如:

set odps.sql.unstructured.data.split.size=128;
set odps.sql.unstructured.data.single.file.split.enabled=true;

設(shè)置好這些參數(shù)后,就需要編寫特定的Reader類來進(jìn)行單個(gè)大文件的拆分了。

核心的思路是,根據(jù) odps.sql.unstructured.data.split.size 所設(shè)定的值,大概將文件按照這個(gè)大小拆分開,但是拆分點(diǎn)極大可能會切在一條記錄的中間,這時(shí)就需要調(diào)整字節(jié)數(shù),向前或向后尋找換行符,來保證最終的切分點(diǎn)落在一整條記錄的尾部。具體的實(shí)現(xiàn)細(xì)節(jié)相對來講比較復(fù)雜,可以參考在 《訪問 OSS 非結(jié)構(gòu)化數(shù)據(jù)》文檔中提到的 代碼示例 來進(jìn)行分析。

注意點(diǎn)
在計(jì)算字節(jié)數(shù)的過程中,可能會遇到非英文字符造成計(jì)算切分點(diǎn)的位置計(jì)算不準(zhǔn)確,進(jìn)而出現(xiàn)讀取的字節(jié)流仍然沒有把一整行覆蓋到的情況。這需要針對含有非英文字符的文本數(shù)據(jù)做一些特殊處理。

代碼示例:

  @Override
    public int read(char[] cbuf, int off, int len) throws IOException {
        if (this.splitReadLen >= this.splitSize) {
            return -1;
        }
        if (this.splitReadLen + len >= this.splitSize) {
            len = (int) (this.splitSize - this.splitReadLen);
        }
        int readSize = this.internalReader.read(cbuf, off, len);
        int totalBytes = 0;
        for (char ch : cbuf) {
            String str = String.valueOf(ch);
            byte[] bytes = str.getBytes(charset);
            totalBytes += bytes.length;
        }
        this.splitReadLen += totalBytes;
        return readSize;
    }

其他建議

在編寫自定義Extractor的程序中,適當(dāng)加入System.out作為日志信息輸出,這些日志信息會在MaxCompute執(zhí)行時(shí)輸出在LogView的視圖中,對于調(diào)試過程和線上問題排查過程非常有幫助。
上文中提到通過調(diào)整 odps.sql.unstructured.data.split.size 參數(shù)值來適當(dāng)提高M(jìn)apper任務(wù)的并行度,但是并行度并不是越高越好,具體什么值最合適是與OSS上的文件大小、總數(shù)據(jù)量、MaxCompute產(chǎn)品自身的集群狀態(tài)緊密聯(lián)系在一起的,需要多次調(diào)試,并且可能需要與 odps.stage.reducer.num、odps.sql.reshuffle.dynamicpt、odps.merge.smallfile.filesize.threshold 等參數(shù)配合使用才能找到最優(yōu)值。并且由于MaxCompute產(chǎn)品自身的集群狀態(tài)也是很重要的因素,可能今天申請500個(gè)Mapper資源是很容易的事情,過幾個(gè)月就變成經(jīng)常需要等待很長時(shí)間才能申請到,這就需要持續(xù)關(guān)注任務(wù)的執(zhí)行時(shí)間并及時(shí)調(diào)整參數(shù)設(shè)定。
外部表的讀取和解析是依靠Extractor對文本的解析來實(shí)現(xiàn)的,因此在執(zhí)行效率上是遠(yuǎn)不能和MaxCompute的普通表相比的,所以在需要頻繁讀取和分析OSS上的文本文件的情況下,建議將OSS文件先 INSERT OVERWRITE 到MaxCompute中字段完全對等的一張普通表中,然后針對普通表進(jìn)行分析計(jì)算,這樣通常會獲得更好的計(jì)算效率。

原文鏈接

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/71255.html

相關(guān)文章

  • MaxCompute助力OSS支持EB級計(jì)算力

    摘要:作為阿里云大數(shù)據(jù)旗艦產(chǎn)品,的級別性能處理達(dá)到了全球領(lǐng)先性,被評為全球云端數(shù)據(jù)倉庫領(lǐng)導(dǎo)者。天弘基金天弘基金旗下的余額寶,是中國規(guī)模最大的貨幣基金。場景二阿里云產(chǎn)品消費(fèi)賬單分析準(zhǔn)備工作完成案例中準(zhǔn)備工作步驟。 摘要: 一、 MaxCompute是什么 你的OSS數(shù)據(jù)是否作堆積在一旁沉睡已久存儲成本變?yōu)槠髽I(yè)負(fù)擔(dān)你是否想喚醒沉睡的數(shù)據(jù)驅(qū)動(dòng)你的業(yè)務(wù)前行MaxCompute可以幫助你高效且低成本的...

    Enlightenment 評論0 收藏0
  • 如何在MaxCompute上處理存儲在OSS開源格式數(shù)據(jù)

    摘要:在之前的文章中,我們已經(jīng)介紹過怎樣在上對存儲在上的文本,音頻,圖像等格式的數(shù)據(jù),以及的數(shù)據(jù)進(jìn)行計(jì)算處理。外部表的必須與具體上存儲存儲數(shù)據(jù)的相符合。唯一不同的只是在內(nèi)部計(jì)算引擎將從上去讀取對應(yīng)的數(shù)據(jù)來進(jìn)行處理。 前言MaxCompute作為使用最廣泛的大數(shù)據(jù)平臺,內(nèi)部存儲的數(shù)據(jù)以EB量級計(jì)算。巨大的數(shù)據(jù)存儲量以及大規(guī)模計(jì)算下高性能數(shù)據(jù)讀寫的需求,對于MaxCompute提出了各種高要求及...

    lowett 評論0 收藏0

發(fā)表評論

0條評論

閱讀需要支付1元查看
<