成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

修改Flume源碼使taildir source支持遞歸(可配置)

fuyi501 / 1649人閱讀

摘要:的選哪個(gè)首選斷點(diǎn)還原可以記錄偏移量可配置文件組,里面使用正則表達(dá)式配置多個(gè)要監(jiān)控的文件就憑第一點(diǎn)其他的都被比下去了這么好的有一點(diǎn)不完美,不能支持遞歸監(jiān)控文件夾。

Flume的source選哪個(gè)?
taildir source首選!
1.斷點(diǎn)還原 positionFile可以記錄偏移量
2.可配置文件組,里面使用正則表達(dá)式配置多個(gè)要監(jiān)控的文件
就憑第一點(diǎn)其他的source都被比下去了!
這么好的taildir source有一點(diǎn)不完美,不能支持遞歸監(jiān)控文件夾。
所以就只能修改源代碼了……好玩,我喜歡~

改源碼,先讀源碼

Flume的taildir source啟動(dòng)會(huì)調(diào)用start()方法作初始化,里面創(chuàng)建一個(gè)ReliableTaildirEventReader,這里用到了建造者模式

@Override
public synchronized void start() {
    logger.info("{} TaildirSource source starting with directory: {}", getName(), filePaths);
    try {
        reader = new ReliableTaildirEventReader.Builder()
                .filePaths(filePaths)
                .headerTable(headerTable)
                .positionFilePath(positionFilePath)
                .skipToEnd(skipToEnd)
                .addByteOffset(byteOffsetHeader)
                .cachePatternMatching(cachePatternMatching)
                .recursive(isRecursive)
                .annotateFileName(fileHeader)
                .fileNameHeader(fileHeaderKey)
                .build();
    } catch (IOException e) {
        throw new FlumeException("Error instantiating ReliableTaildirEventReader", e);
    }
    idleFileChecker = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build());
    idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(),
            idleTimeout, checkIdleInterval, TimeUnit.MILLISECONDS);

    positionWriter = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder().setNameFormat("positionWriter").build());
    positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(),
            writePosInitDelay, writePosInterval, TimeUnit.MILLISECONDS);

    super.start();
    logger.debug("TaildirSource started");
    sourceCounter.start();
}

taildir source屬于PollableSource,

/**
 * A {@link Source} that requires an external driver to poll to determine
 * whether there are {@linkplain Event events} that are available to ingest
 * from the source.
 *
 * @see org.apache.flume.source.EventDrivenSourceRunner
 */
public interface PollableSource extends Source {
    ...

這段注釋的意思是PollableSource是需要一個(gè)外部驅(qū)動(dòng)去查看有沒(méi)有需要消費(fèi)的事件,從而拉取事件,講白了就是定時(shí)拉取。所以flume也不一定是真正實(shí)時(shí)的,只是隔一會(huì)兒不停地來(lái)查看事件而已。(與之相應(yīng)的是另一種EventDrivenSourceRunner)
那么taildir source在定時(shí)拉取事件的時(shí)候是調(diào)用的process方法

@Override
public Status process() {
    Status status = Status.READY;
    try {
        existingInodes.clear();
        existingInodes.addAll(reader.updateTailFiles());
        for (long inode : existingInodes) {
            TailFile tf = reader.getTailFiles().get(inode);
            if (tf.needTail()) {
                tailFileProcess(tf, true);
            }
        }
        closeTailFiles();
        try {
            TimeUnit.MILLISECONDS.sleep(retryInterval);
        } catch (InterruptedException e) {
            logger.info("Interrupted while sleeping");
        }
    } catch (Throwable t) {
        logger.error("Unable to tail files", t);
        status = Status.BACKOFF;
    }
    return status;
}

重點(diǎn)就是下面這幾行

existingInodes.addAll(reader.updateTailFiles());
for (long inode : existingInodes) {
TailFile tf = reader.getTailFiles().get(inode);
if (tf.needTail()) {
tailFileProcess(tf, true);
} }
reader.updateTailFiles()獲取需要監(jiān)控的文件,然后對(duì)每一個(gè)進(jìn)行處理,查看最后修改時(shí)間,判定是否需要tail,需要tailtail
那么進(jìn)入reader.updateTailFiles()
for (TaildirMatcher taildir : taildirCache) {
      Map headers = headerTable.row(taildir.getFileGroup());

      for (File f : taildir.getMatchingFiles()) {
        long inode = getInode(f);
        TailFile tf = tailFiles.get(inode);
        if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
          long startPos = skipToEnd ? f.length() : 0;
          tf = openFile(f, headers, inode, startPos);

遍歷每一個(gè)正則表達(dá)式匹配對(duì)應(yīng)的匹配器,每個(gè)匹配器去獲取匹配的文件!taildir.getMatchingFiles()

List getMatchingFiles() {
    long now = TimeUnit.SECONDS.toMillis(
            TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
    long currentParentDirMTime = parentDir.lastModified();
    List result;

    // calculate matched files if
    // - we don"t want to use cache (recalculate every time) OR
    // - directory was clearly updated after the last check OR
    // - last mtime change wasn"t already checked for sure
    //   (system clock hasn"t passed that second yet)
    if (!cachePatternMatching ||
            lastSeenParentDirMTime < currentParentDirMTime ||
            !(currentParentDirMTime < lastCheckedTime)) {
        lastMatchedFiles = sortByLastModifiedTime(getMatchingFilesNoCache(isRecursive));
        lastSeenParentDirMTime = currentParentDirMTime;
        lastCheckedTime = now;
    }

    return lastMatchedFiles;
}

可以看到getMatchingFilesNoCache(isRecursive)就是獲取匹配的文件的方法,也就是需要修改的方法了!
ps:這里的isRecursive是我加的~
點(diǎn)進(jìn)去:

private List getMatchingFilesNoCache() {
    List result = Lists.newArrayList();
    try (DirectoryStream stream = Files.newDirectoryStream(parentDir.toPath(), fileFilter)) {
        for (Path entry : stream) {
            result.add(entry.toFile());
        }
    } catch (IOException e) {
        logger.error("I/O exception occurred while listing parent directory. " +
                "Files already matched will be returned. " + parentDir.toPath(), e);
    }
    return result;
}

源碼是用了Files.newDirectoryStream(parentDir.toPath(), fileFilter)),將父目錄下符合正則表達(dá)式的文件都添加到一個(gè)迭代器里。(這里還用了try (...)的語(yǔ)法糖)


找到地方了,開(kāi)始改!

我在這個(gè)getMatchingFilesNoCache()方法下面下了一個(gè)重載的方法, 可增加擴(kuò)展性:

private List getMatchingFilesNoCache(boolean recursion) {
    if (!recursion) {
        return getMatchingFilesNoCache();
    }
    List result = Lists.newArrayList();
    // 使用非遞歸的方式遍歷文件夾
    Queue dirs = new ArrayBlockingQueue<>(10);
    dirs.offer(parentDir);
    while (dirs.size() > 0) {
        File dir = dirs.poll();
        try {
            DirectoryStream stream = Files.newDirectoryStream(dir.toPath(), fileFilter);
            stream.forEach(path -> result.add(path.toFile()));
        } catch (IOException e) {
            logger.error("I/O exception occurred while listing parent directory. " +
                    "Files already matched will be returned. (recursion)" + parentDir.toPath(), e);
        }
        File[] dirList = dir.listFiles();
        assert dirList != null;
        for (File f : dirList) {
            if (f.isDirectory()) {
                dirs.add(f);
            }
        }
    }
    return result;
}

我使用了非遞歸的方式遍歷文件夾,就是樹(shù)到隊(duì)列的轉(zhuǎn)換。
到這里,核心部分就改完了。接下來(lái)要處理這個(gè)recursion的參數(shù)


華麗的分割線后,順騰摸瓜!

一路改構(gòu)造方法,添加這個(gè)參數(shù),最終參數(shù)從哪來(lái)呢?
flume的source啟動(dòng)時(shí)會(huì)調(diào)用configure方法,將Context中的內(nèi)容配置進(jìn)reader等對(duì)象中。
isRecursive = context.getBoolean(RECURSIVE, DEFAULT_RECURSIVE);
contextTaildirSourceConfigurationConstants中獲取配置名和默認(rèn)值

  /**
   * Whether to support recursion. */
  public static final String RECURSIVE = "recursive";
  public static final boolean DEFAULT_RECURSIVE = false;

這里的recursive也就是flume配置文件里配置項(xiàng)了

# Whether to support recusion
a1.sources.r1.recursive = true
大功告成,打包試試!

用maven只對(duì)這一個(gè)module打包。我把這個(gè)module的pom改了下artifactId,加上了自己名字作個(gè)紀(jì)念,哈哈
可惜pom里面不能寫(xiě)中文……

org.apache.flume.flume-ng-sources
flume-taildir-source-recursive-by-Wish000
Flume Taildir Source

執(zhí)行package將其放在flume的lib下,替換原來(lái)的flume-taildir-source***.jar
啟動(dòng),測(cè)試,成功!

具體代碼見(jiàn)GitHub地址:https://github.com/Wish000/me...

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/36029.html

相關(guān)文章

  • 修改Flume源碼使taildir source支持遞歸配置

    摘要:的選哪個(gè)首選斷點(diǎn)還原可以記錄偏移量可配置文件組,里面使用正則表達(dá)式配置多個(gè)要監(jiān)控的文件就憑第一點(diǎn)其他的都被比下去了這么好的有一點(diǎn)不完美,不能支持遞歸監(jiān)控文件夾。 Flume的source選哪個(gè)?taildir source首選!1.斷點(diǎn)還原 positionFile可以記錄偏移量2.可配置文件組,里面使用正則表達(dá)式配置多個(gè)要監(jiān)控的文件就憑第一點(diǎn)其他的source都被比下去了!這么好的t...

    tylin 評(píng)論0 收藏0
  • 微服務(wù)之分布式文件系統(tǒng)

    摘要:于是便誕生了隨行付分布式文件系統(tǒng)簡(jiǎn)稱(chēng),提供的海量安全低成本高可靠的云存儲(chǔ)服務(wù)。子系統(tǒng)相關(guān)流程圖如下核心實(shí)現(xiàn)主要為隨行付各個(gè)業(yè)務(wù)系統(tǒng)提供文件共享和訪問(wèn)服務(wù),并且可以按應(yīng)用統(tǒng)計(jì)流量命中率空間等指標(biāo)。 背景 傳統(tǒng)Web應(yīng)用中所有的功能部署在一起,圖片、文件也在一臺(tái)服務(wù)器;應(yīng)用微服務(wù)架構(gòu)后,服務(wù)之間的圖片共享通過(guò)FTP+Nginx靜態(tài)資源的方式進(jìn)行訪問(wèn),文件共享通過(guò)nfs磁盤(pán)掛載的方式進(jìn)行訪問(wèn)...

    stormjun 評(píng)論0 收藏0
  • Flume日志采集框架的使

    摘要:對(duì)于一般的采集需求,通過(guò)對(duì)的簡(jiǎn)單配置即可實(shí)現(xiàn)。針對(duì)特殊場(chǎng)景也具備良好的自定義擴(kuò)展能力,因此,可以適用于大部分的日常數(shù)據(jù)采集場(chǎng)景。 文章作者:foochane? 原文鏈接:https://foochane.cn/article/2019062701.html Flume日志采集框架 安裝和部署 Flume運(yùn)行機(jī)制 采集靜態(tài)文件到hdfs 采集動(dòng)態(tài)日志文件到hdfs 兩個(gè)agent級(jí)聯(lián) F...

    laznrbfe 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<