成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

從一段代碼談起——淺談JavaIO接口

pkwenda / 1812人閱讀

摘要:緩沖輸入流從被稱為緩沖區(qū)的存儲器區(qū)域讀出數(shù)據(jù)僅當(dāng)緩沖區(qū)是空時,本地輸入才被調(diào)用。同樣,緩沖輸出流,將數(shù)據(jù)寫入到緩存區(qū),只有當(dāng)緩沖區(qū)已滿才調(diào)用本機輸出。

:https://segmentfault.com/blog...

1.前言

前陣子休息天日常在尋找項目里不好的代碼,看到了這樣的一段代碼:

    private Result sshSameExec(Session session, String cmd) {
        if (log.isDebugEnabled()) {
            log.debug("shell command: {}", cmd);
        }
        UserInfo ui = getUserInfo();
        session.setUserInfo(ui);
        int exitStatus = 0;
        StringBuilder builder = new StringBuilder();
        ChannelExec channel;
        InputStream in;
        InputStream err;
        try {
            session.connect(connectTimeout);
            channel = (ChannelExec) session.openChannel("exec");
            channel.setCommand(cmd);
            in = channel.getInputStream();
            err = channel.getErrStream();
            channel.connect();
        } catch (Exception e) {
            throw new CloudRuntimeException(e);
        }

        try {
            long lastRead = Long.MAX_VALUE;
            byte[] tmp = new byte[1024];
            while (true) {
                while (in.available() > 0 || err.available() > 0) {
                    int i = 0;
                    if (in.available() > 0) {
                        i = in.read(tmp, 0, 1024);
                    } else if (err.available() > 0) {
                        i = err.read(tmp, 0, 1024);
                    }
                    if (i < 0) {
                        break;
                    }
                    lastRead = System.currentTimeMillis();
                    builder.append(new String(tmp, 0, i));
                }
                if (channel.isClosed()) {
                    if (in.available() > 0) {
                        continue;
                    }
                    exitStatus = channel.getExitStatus();
                    break;
                }
                if (System.currentTimeMillis() - lastRead > exeTimeout) {
                    break;
                }
            }
        } catch (IOException e) {
            throw new CloudRuntimeException(e);
        } finally {
            channel.disconnect();
            session.disconnect();
        }

        if (0 != exitStatus) {
            return Result.createByError(ErrorData.builder()
                    .errorCode(ResultCode.EXECUTE_SSH_FAIL.getCode())
                    .detail(builder.toString())
                    .title(ResultCode.EXECUTE_SSH_FAIL.toString())
                    .build());
        } else {
            return Result.createBySuccess(builder.toString());
        }
    }

簡單解釋一下這段代碼——即通過ssh到一臺機器上,然后執(zhí)行一些命令.對命令輸出的東西,開了一個循環(huán),每一次讀一定的位置,然后以字節(jié)流的形式讀回來.

這段代碼有點丑,于是我聞到了學(xué)習(xí)的味道.

首先是對兩個Stream的消費,很顯然,在多核環(huán)境下,我們同時也只能夠消費其中一個Stream.其次,這代碼太挫了,自己定義一個tmp,然后1024、1024這樣的去取出來.

在改良之前,我們先來回顧一下JavaIO的接口定義.

2.JavaIO 接口知識回顧 2.1 低級抽象接口:InputStream 和 OutputStream

這里有同學(xué)可能問了,為啥叫它低抽象接口呢?因為它離底層太近了,計算機本來就是處理二進制的,而這兩個接口正是用來處理二進制數(shù)據(jù)流的.

先簡單看一眼這兩個接口:

InputStream

**
 * This abstract class is the superclass of all classes representing
 * an input stream of bytes.
 *
 * 

Applications that need to define a subclass of InputStream * must always provide a method that returns the next byte of input. * * @author Arthur van Hoff * @see java.io.BufferedInputStream * @see java.io.ByteArrayInputStream * @see java.io.DataInputStream * @see java.io.FilterInputStream * @see java.io.InputStream#read() * @see java.io.OutputStream * @see java.io.PushbackInputStream * @since JDK1.0 */ public abstract class InputStream implements Closeable {.....}

OutputStream

/**
 * This abstract class is the superclass of all classes representing
 * an output stream of bytes. An output stream accepts output bytes
 * and sends them to some sink.
 * 

* Applications that need to define a subclass of * OutputStream must always provide at least a method * that writes one byte of output. * * @author Arthur van Hoff * @see java.io.BufferedOutputStream * @see java.io.ByteArrayOutputStream * @see java.io.DataOutputStream * @see java.io.FilterOutputStream * @see java.io.InputStream * @see java.io.OutputStream#write(int) * @since JDK1.0 */ public abstract class OutputStream implements Closeable, Flushable {...}

我們可以發(fā)現(xiàn),它們都實現(xiàn)了Closeable的接口.因此大家在使用這些原生類時,要注意在結(jié)束時調(diào)用Close方法哦.

這兩個接口的常用實現(xiàn)類有:
-?FileInputStreamFileOutputStream

DataInputStreamDataOutputStream

?ObjectInputStreamObjectOutputStream

2.2 高級抽象接口——Writer和Reader

為啥說它是高級抽象接口呢?我們先來看看它們的注釋:

Writer

/**
 * Abstract class for writing to character streams.  The only methods that a
 * subclass must implement are write(char[], int, int), flush(), and close().
 * Most subclasses, however, will override some of the methods defined here in
 * order to provide higher efficiency, additional functionality, or both.
 *
 * @see Writer
 * @see   BufferedWriter
 * @see   CharArrayWriter
 * @see   FilterWriter
 * @see   OutputStreamWriter
 * @see     FileWriter
 * @see   PipedWriter
 * @see   PrintWriter
 * @see   StringWriter
 * @see Reader
 *
 * @author      Mark Reinhold
 * @since       JDK1.1
 */

public abstract class Writer implements Appendable, Closeable, Flushable {

Reader

/**
 * Abstract class for reading character streams.  The only methods that a
 * subclass must implement are read(char[], int, int) and close().  Most
 * subclasses, however, will override some of the methods defined here in order
 * to provide higher efficiency, additional functionality, or both.
 *
 *
 * @see BufferedReader
 * @see   LineNumberReader
 * @see CharArrayReader
 * @see InputStreamReader
 * @see   FileReader
 * @see FilterReader
 * @see   PushbackReader
 * @see PipedReader
 * @see StringReader
 * @see Writer
 *
 * @author      Mark Reinhold
 * @since       JDK1.1
 */

public abstract class Reader implements Readable, Closeable {

我們可以看到,這個抽象類是用來面向character的,也就是字符.字符的抽象等級必然比字節(jié)高,因為字符靠近上層,即人類.

2.3 優(yōu)化輸入和輸出——Buffered

如果我們直接使用上述實現(xiàn)類去打開一個文件(如FileWriter FileReader 、FileInputStream 、FileOutputStream ),對其對象調(diào)用readwrite、readLine等,每個請求都是由基礎(chǔ)OS直接處理的,這會使一個程序效率低得多——因為它們都會引發(fā)磁盤訪問or網(wǎng)絡(luò)請求等.

為了減少這種開銷,Java 平臺實現(xiàn)緩沖 I/O 流。緩沖輸入流從被稱為緩沖區(qū)(buffer)的存儲器區(qū)域讀出數(shù)據(jù);僅當(dāng)緩沖區(qū)是空時,本地輸入 API 才被調(diào)用。同樣,緩沖輸出流,將數(shù)據(jù)寫入到緩存區(qū),只有當(dāng)緩沖區(qū)已滿才調(diào)用本機輸出 API。

用于包裝非緩存流的緩沖流類有4個:BufferedInputStreamBufferedOutputStream·用于創(chuàng)建字節(jié)緩沖字節(jié)流,?BufferedReaderBufferedWriter`用于創(chuàng)建字符緩沖字節(jié)流.

3. 著手優(yōu)化

之前,我們提到了這段代碼寫得搓的地方:

首先是對兩個Stream的消費,很顯然,在多核環(huán)境下,我們同時也只能夠消費其中一個Stream.

其次,這代碼太挫了,自己定義一個tmp,然后1024、1024這樣的去取出來.

故此,我們可以考慮對每個Stream都進行包裝,支持用線程去消費,其次我們可以用高級抽象分接口去適配Byte,然后去裝飾成Buffer.

接下來,我們來看一段ZStack里的工具類ShellUtils,為了節(jié)省篇幅,我們僅僅截出它在IDE里的
概覽:

run方法的核心:

        public ShellResult run() {
            StopWatch watch = new StopWatch();
            watch.start();
            try {
                if (withSudo) {
                    command = String.format("sudo %s", command);
                }

                ProcessBuilder pb = new ProcessBuilder(Arrays.asList("/bin/bash", "-c", command));
                if (baseDir == null) {
                    baseDir = System.getProperty("user.home");
                }
                pb.directory(new File(baseDir));

                process = pb.start();
                if (!suppressTraceLog && logger.isTraceEnabled()) {
                    logger.debug(String.format("exec shell command[%s]", command));
                }

                Writer stdout;
                int stdoutLog = stdoutLogStrategy();
                if (stdoutLog == LOG_TO_FILE) {
                    stdout = new BufferedWriter(new FileWriter(stdoutFile));
                } else if (stdoutLog == LOG_TO_SCREEN) {
                    stdout = new BufferedWriter(new OutputStreamWriter(System.out));
                } else {
                    stdout = new StringWriter();
                }

                Writer stderr;
                int stderrLog = stderrLogStrategy();
                if (stderrLog == LOG_TO_FILE) {
                    stderr = new BufferedWriter(new FileWriter(stderrFile));
                } else if (stderrLog == LOG_TO_SCREEN) {
                    stderr = new BufferedWriter(new OutputStreamWriter(System.err));
                } else {
                    stderr = new StringWriter();
                }

                StreamConsumer stdoutConsumer = new StreamConsumer(process.getInputStream(), new PrintWriter(stdout, true), stdoutLog != LOG_TO_FILE);
                StreamConsumer stderrConsumer = new StreamConsumer(process.getErrorStream(), new PrintWriter(stderr, true), stderrLog != LOG_TO_FILE);

                stderrConsumer.start();
                stdoutConsumer.start();
                process.waitFor();
                stderrConsumer.join(TimeUnit.SECONDS.toMillis(30));
                stdoutConsumer.join(TimeUnit.SECONDS.toMillis(30));

                ShellResult ret = new ShellResult();
                ret.setCommand(command);
                ret.setRetCode(process.exitValue());
                if (stderrLog == LOG_TO_STRING) {
                    ret.setStderr(stderr.toString());
                } else if (stderrLog == LOG_TO_FILE) {
                    stderr.close();
                }
                if (stdoutLog == LOG_TO_STRING) {
                    ret.setStdout(stdout.toString());
                } else if (stdoutLog == LOG_TO_FILE) {
                    stdout.close();
                }

                return ret;
            } catch (Exception e) {
                StringBuilder sb = new StringBuilder();
                sb.append("Shell command failed:
");
                sb.append(command);
                throw new ShellException(sb.toString(), e);
            } finally {
                if (process != null) {
                    process.destroy();
                }
                watch.stop();
                if (!suppressTraceLog && logger.isTraceEnabled()) {
                    logger.trace(String.format("shell command[%s] costs %sms to finish", command, watch.getTime()));
                }
            }
        }
    }

我們可以看到StreamConsumer這個類,我們來看一下它的代碼:

    private static class StreamConsumer extends Thread {
        final InputStream in;
        final PrintWriter out;
        final boolean flush;

        StreamConsumer(InputStream in, PrintWriter out, boolean flushEveryWrite) {
            this.in = in;
            this.out = out;
            flush = flushEveryWrite;
        }

        @Override
        public void run() {
            BufferedReader br = null;
            try {
                br = new BufferedReader(new InputStreamReader(in));
                String line;
                while ( (line = br.readLine()) != null) {
                    out.println(line);
                    if (flush) {
                        out.flush();
                    }
                }
            } catch (Exception e) {
                logger.warn(e.getMessage(), e);
            } finally {
                try {
                    if (br != null) {
                        br.close();
                    }
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }

這段代碼已經(jīng)達到了我們的理想狀態(tài):線程消費,高級抽象.

3.1 使用Kotlin 3.1.1 Kotlin IO

閑話不多說,先貼代碼為敬:

import java.io.InputStream
import java.io.InputStreamReader

class StreamGobbler(private val inputStream: InputStream, private var result: StringBuilder) : Runnable {

    override fun run() {
        InputStreamReader(inputStream).buffered().use {
            it.lines().forEach { r -> result.append(r) }
        }
    }
}

還是一樣熟悉的配方,我們逐行來解讀:

定義一個類,并且要求構(gòu)造函數(shù)必須傳入InputStream和一個StringBuilder.且實現(xiàn)了Runnable接口,這意味著它可以被線程消費.

覆寫run方法.我們可以看到InputStream被適配成了InputStreamReader,這意味著它可以輸出字符流了,然后我們使用了Kotlin的接口將其裝飾成了Buffer.

讀每一行buffer,并appned到result這個StringBuilder里去.

讀完就可以告辭了,close.(use會將其關(guān)閉)

3.1.2 Kotlin Coroutine

先看一下上面的圖,我們都知道內(nèi)核態(tài)線程是由OS調(diào)度的,但當(dāng)一個線程拿到時間片時,卻調(diào)到了阻塞IO,那么只能等在那邊,浪費時間.

而協(xié)程則可以解決這個問題,當(dāng)一個Jobhang住的時候,可以去做別的事情,繞開阻塞.更好的利用時間片.

最后,我們來看一下成品代碼:

    override fun sshExecWithCoroutine(session: Session, cmd: String): SimpleResult {
        val ui = InnerUserInfo()
        session.userInfo = ui
        val exitStatus: Int
        var channel = ChannelExec()
        val inputBuilder = StringBuilder()
        val errorBuilder = StringBuilder()
        try {
            session.connect(connectTimeout)
            channel = session.openChannel("exec") as ChannelExec
            channel.setCommand(cmd)
            channel.connect()
            val inputStream = StreamGobbler(channel.inputStream, inputBuilder)
            val errStream = StreamGobbler(channel.errStream, errorBuilder)

            val customJob = GlobalScope.launch {
                customStream(inputStream, errStream)
            }

            while (!customJob.isCompleted) {
                // wait job be done
            }

            exitStatus = channel.exitStatus
        } catch (e: IOException) {
            throw java.lang.RuntimeException(e)
        } finally {
            if (channel.isConnected) {
                channel.disconnect()
            }
            if (session.isConnected) {
                session.disconnect()
            }
        }

        return if (0 != exitStatus) {
            return SimpleResult.createByError(ErrorData.Builder()
                    .errorCode(ResultCode.EXECUTE_SSH_FAIL.value)
                    .detail(errorBuilder.toString())
                    .title(ResultCode.EXECUTE_SSH_FAIL.toString())
                    .build())

        } else {
            SimpleResult.createBySuccess(inputBuilder.toString())
        }
    }


    private suspend fun customStream(inputStream: StreamGobbler, errorStream: StreamGobbler) {
        val inputDeferred = GlobalScope.async {
            inputStream.run()
        }
        val errorDeferred = GlobalScope.async {
            errorStream.run()
        }

        inputDeferred.join()
        errorDeferred.join()
    }

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/75095.html

相關(guān)文章

  • Java知識點總結(jié)(JavaIO-字符流)

    摘要:使用字節(jié)流寫入文件,如果沒有關(guān)閉字節(jié)流操作,文件依然存在內(nèi)容,說明字節(jié)流是操作文件本身的。字節(jié)流比字符流更好,使用更廣泛。 Java知識點總結(jié)(JavaIO-字符流) @(Java知識點總結(jié))[Java, JavaIO] [toc] 在程序中一個字符等于兩個字節(jié),那么 Java 提供了 Reader 和 Writer 兩個專門操作字符流的類。 字符輸出流:Writer 類定義如下: p...

    thekingisalwaysluc 評論0 收藏0
  • Java知識點總結(jié)(JavaIO-異常)

    摘要:知識點總結(jié)異常知識點總結(jié)異常為什么需要異常機制不是所有的問題都能在編譯時被發(fā)現(xiàn),有些問題在程序運行時才會暴露出來異常機制是面向?qū)ο蟮奶幚沓绦蛟谶\行時發(fā)生的狀況的手段使用異常機制不會打亂原有業(yè)務(wù)邏輯的用塊把可能出異常的代碼保護起來用一個 Java知識點總結(jié)(JavaIO-異常) @(Java知識點總結(jié))[Java, Java異常] [toc] 為什么需要異常機制 不是所有的問題都能在編譯...

    Near_Li 評論0 收藏0
  • Java知識點總結(jié)(JavaIO-內(nèi)存操作流)

    摘要:知識點總結(jié)內(nèi)存操作流知識點總結(jié)前面所講的程序中輸入輸出都是從文件中來,當(dāng)然也可以將輸出的位置設(shè)置在內(nèi)存上。將內(nèi)容寫入到內(nèi)存中。 Java知識點總結(jié)(JavaIO-內(nèi)存操作流) @(Java知識點總結(jié))[Java, JavaIO] [toc] showImg(https://segmentfault.com/img/bV82tm?w=753&h=275); 前面所講的程序中輸入、輸出都是...

    Half 評論0 收藏0
  • JavaIOJavaIO輸入輸出流

    摘要:下面我們使用字節(jié)輸入輸出流來說明這個問題輸入流一般是由對象如建立的,當(dāng)新建一個時,對象建立了一個包含有數(shù)據(jù)的管道其實就是我們所說的這個流并將對象存儲的數(shù)據(jù)輸入到管道中,因此管道里的數(shù)據(jù)流就是對象內(nèi)的數(shù)據(jù)。 流的原理: showImg(/img/bVqa89); 一連串有順序的數(shù)據(jù)系列可以看成是一個流。 輸入輸出流: 數(shù)據(jù)從IO輸入到程序的流是輸入流,數(shù)據(jù)從程序輸出到IO的流是輸出流。 ...

    CloudwiseAPM 評論0 收藏0
  • Java知識點總結(jié)(JavaIO- Scanner類 )

    摘要:知識點總結(jié)類知識點總結(jié)后提供的輸入數(shù)據(jù)類,此類位于包中,不僅可以完成輸入數(shù)據(jù)的操作,還可以方便地對輸入數(shù)據(jù)進行驗證。 Java知識點總結(jié)(JavaIO- Scanner類 ) @(Java知識點總結(jié))[Java, JavaIO] showImg(https://segmentfault.com/img/bV9dAj?w=838&h=396); JDK 1.5后提供的輸入數(shù)據(jù)類,此類位于...

    CarlBenjamin 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<