成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

基于haddop的HDFS和Excel開源庫POI導(dǎo)出大數(shù)據(jù)報(bào)表(一)

luffyZh / 1012人閱讀

摘要:說明這里用到的項(xiàng)目都是基于的項(xiàng)目。但同時(shí),它和其他的分布式文件系統(tǒng)的區(qū)別也是很明顯的。能提供高吞吐量的數(shù)據(jù)訪問,非常適合大規(guī)模數(shù)據(jù)集上的應(yīng)用。放寬了一部分約束,來實(shí)現(xiàn)流式讀取文件系統(tǒng)數(shù)據(jù)的目的。是項(xiàng)目的一部分。

關(guān)鍵詞

Java、PHP、hdfs、mqrocketexcel、poi、報(bào)表

需求背景

在業(yè)務(wù)需求方面,每個(gè)企業(yè)或多或少都會(huì)有報(bào)表導(dǎo)出的作業(yè),量少則可是使用輸出流或者字符串的輸出即可完成,只要指定respose的相應(yīng)Content-Type即可。如果大量的數(shù)據(jù)需要導(dǎo)出,尤其是訂單這類業(yè)務(wù)邏輯復(fù)雜的報(bào)表,導(dǎo)出的時(shí)候需要加入各種條件和權(quán)限,從數(shù)據(jù)處理方面就已經(jīng)很費(fèi)力了,更何況導(dǎo)出的需求不是一天兩天,而是半月一月的數(shù)據(jù)量,小公司的業(yè)務(wù),數(shù)量級(jí)也可能達(dá)到了十多萬。

function generateExcel($filename, $header, array &$data)
{
    generateDownHeader($filename);

    $rs = "";
    if (is_string($header)) {
        $header = explode(",", $header);
    }
    foreach ($header as $v) {
        $rs .= "";
    }
    $rs .= "";
    foreach ($data as $coll) {
        $rs .= "";
        foreach ($coll as $v) {
            if (AppHelper::isDouble($v)) {
                $rs .= "";
            } else {
                $rs .= "";
            }
        }
        $rs .= "";
    }

    $rs .= "
".$v."
".$v."".$v."
"; echo $rs; exit; } function generateDownHeader($filename) { header("Content-Type: application/force-download"); header("Content-Type: application/octet-stream"); header("Content-Type: application/download"); header("Content-Disposition:inline;filename="".$filename."""); header("Content-Transfer-Encoding: binary"); header("Last-Modified: " . gmdate("D, d M Y H:i:s") . " GMT"); header("Cache-Control: must-revalidate, post-check=0, pre-check=0"); header("Pragma: no-cache"); }

這十多萬的數(shù)據(jù),如果使用一般的方法(上面代碼所示)或許是不可行的(其他一般方法沒有嘗試過),php處理中一般使用curl調(diào)用接口,nginx服務(wù)器和php中的curl請求超時(shí)一般都是30s,30s處理1w條數(shù)據(jù)的導(dǎo)出工作,如果服務(wù)器的性能好,并且是多核的,可以使用multi_curl多線程處理,如果服務(wù)器的性能不是很好,這種處理方法或許更耗時(shí)。

下面是我使用的curl處理接口數(shù)據(jù):

function curl($url, $option = null, $method = "POST", $getCode = false, $header = [])
{
    $curl = curl_init ();
    curl_setopt($curl, CURLOPT_URL, $url);
    curl_setopt($curl, CURLOPT_TIMEOUT, 30);
    if (!array_key_exists("Content-Type", $header)) {
        $header["Content-Type"] = "application/json;charset=UTF-8";
    }
    $headers = [];
    if ($header) {
        foreach ($header as $k=>$v) {
            $headers[] = $k.": ".$v;
        }
    }
    curl_setopt($curl, CURLOPT_HTTPHEADER, $headers);
    if ($option) {
        if (is_array($option)) {
            $option = json_encode($option);
        }
        curl_setopt($curl, CURLOPT_POSTFIELDS, $option);
    }
    curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);
    curl_setopt($curl, CURLOPT_CUSTOMREQUEST, $method);
    $result = curl_exec($curl);
    if ($getCode) {
        $curl_code = curl_getinfo($curl, CURLINFO_HTTP_CODE);
        $message = self::isJson($result) ? json_decode($result, true) : $result;
        $result = ["code" => $curl_code];
        if (isset($message["exception"]) && count($message) == 1) {
            $result["exception"] = $message["exception"];
            $result["result"] = null;
        } else {
            $result["result"] = $message;
        }
    }
    curl_close($curl);
    return $result;
}

因?yàn)閿?shù)據(jù)量大,后來改為多線程:

function curlMulti(array $urls, $options = null, $method = "POST",  $getCode = false, $header = []) 
{
    $mh = curl_multi_init();
    // 添加curl批處理會(huì)話
    $handles = $contents = [];
    foreach ($urls as $key => $url) {
        $handles[$key] = curl_init($url);
        curl_setopt($handles[$key], CURLOPT_RETURNTRANSFER, 1);
        curl_setopt($handles[$key], CURLOPT_TIMEOUT, 30);
        curl_setopt($handles[$key], CURLOPT_CUSTOMREQUEST, $method);

        if (!array_key_exists("Content-Type", $header)) {
            $header["Content-Type"] = "application/json;charset=utf-8";
        }
        $headers = [];
        if ($header) {
            foreach ($header as $k => $val) {
                $headers[] = $k.": ".$val;
            }
        }
        curl_setopt($handles[$key], CURLOPT_HTTPHEADER, $headers);
        if ($options) {
            if (is_array($options)) {
                $options = json_encode($options);
            }
            curl_setopt($handles[$key], CURLOPT_POSTFIELDS, $options);
        }
        curl_multi_add_handle($mh, $handles[$key]);
    }
    // 執(zhí)行批處理句柄
    /*$active = null;
    do{
        $mrc = curl_multi_exec($mh, $active);
    } while ($mrc == CURLM_CALL_MULTI_PERFORM);

    while ($active and $mrc == CURLM_OK) {
        if (curl_multi_select($mh) === -1) {
            usleep(100);
            do {
                $mrc = curl_multi_exec($mh, $active);
            }while($mrc == CURLM_CALL_MULTI_PERFORM);
        }
    }// 獲取批處理內(nèi)容
    $errors = [];
    foreach ($handles as $k => $ch) {
        $errors[$k] = curl_error($ch);
        $content = curl_multi_getcontent($ch);
        if ($getCode) {
            $content = curl_errno($ch) == 0 && self::isJson($content)? json_decode($content,true) : [];
        }
        $contents = array_merge($contents,$content);

    }
    $info = curl_multi_info_read($mh);*/
    $output = $errors = $infos = [];
    do {
        while (($execrun =  curl_multi_exec($mh, $running)) == CURLM_CALL_MULTI_PERFORM);
        if ($execrun != CURLM_OK)
            break;
        while ($done = curl_multi_info_read($mh)) {
            $info= curl_getinfo($done["handle"]);
            $infos["http_code"][] = $info["http_code"];
            $result["code"] = $info["http_code"];
            $infos["url"][] = $info["url"];
            $errors[] = curl_error($done["handle"]);
            $output = self::isJson(curl_multi_getcontent($done["handle"])) ?
                array_merge($output, json_decode(curl_multi_getcontent($done["handle"]),true)) : $output;
            if ($running)
                curl_multi_select($mh, 30);
        }
    } while ($running);

    $result["result"] = $output;
    $result["exception"] = $errors;
    $result["info"] = $infos;
    foreach ($handles as $ch) {
        curl_multi_remove_handle($mh, $ch);
    }
    curl_multi_close($mh);
    return $result;
}

上面的代碼中有一段代碼是注釋掉的,按照道理來說,上面的代碼執(zhí)行的結(jié)果應(yīng)該和下面的一樣,事實(shí)證明,卻是執(zhí)行的結(jié)果是一樣,我這里說的結(jié)果不是多線程返回的結(jié)果,既然是多線程,那么不同的線程競爭到資源也是不一樣的,返回結(jié)果出現(xiàn)了混亂,導(dǎo)出的excel數(shù)據(jù)并不是根據(jù)某種排序而排序的,也就是你不知道那個(gè)線程先返回了結(jié)果,這是問題一,其二,在導(dǎo)出的過程中,發(fā)現(xiàn)不同程度的丟失數(shù)據(jù),加熱管每個(gè)線程500條數(shù)據(jù),結(jié)果在驗(yàn)證數(shù)據(jù)時(shí),發(fā)現(xiàn)僅僅返回了300多條數(shù)據(jù),數(shù)據(jù)變動(dòng)不一致,第三,過多的數(shù)據(jù),依然造成nginx服務(wù)器超時(shí),錯(cuò)誤code 504。

PS: 為什么在php的中沒有使用phpexcel第三方包,原因很簡單,測試發(fā)現(xiàn),phpexcel太耗內(nèi)存,機(jī)器吃不消,所以就沒用。

初步解決方案

既然php的多線程方案不能解決問題,只能找其他的辦法,最可靠的也是大家都能想到的,就是隊(duì)列處理,把導(dǎo)出請求放入到隊(duì)列中,直接返回給客戶端,告訴客戶業(yè)務(wù)正在處理,然后具體的導(dǎo)出交由消費(fèi)端處理,最后把結(jié)果反饋到客戶端。

我們都知道php的隊(duì)列有很多,常用的比如Swoole,Workman以及Gearman等。我選擇了Gearman,因?yàn)榉奖?,而Swoole原來在我們的項(xiàng)目中,后來被踢掉了,不知原由。

Gearman服務(wù)端work的代碼demo:

addServer();
        $worker->addFunction("export", function (GearmanJob $job) {
            $workload = $job->workload();

            if (($data = $this->parseJson($workload)) == false) {
                return AppHelper::encodeJson(["code" => "-1", "result" => null, "exception" => "參數(shù)錯(cuò)誤"]);
            }
            $user = isset($data["user"]) && !empty($data["user"]) ? $data["user"] : "guest";
            $path = dirname(Yii::$app->basePath) . "/backend/downloads/" . sha1($user) . "/" . date("Y-m-d") . "/";
            $filename = isset($data["filename"]) && !empty($data["filename"]) ? $data["filename"] : date("Y-m-d") . "-order.xls";
            $rs = $this->getData($data["type"]["data"], $data["type"]["count"], $data["api"], $data["params"]);
            $this->writeExcel($path, $filename, $rs, $data["header"]);
            return 200;
        });
        //無際循環(huán)運(yùn)行,gearman內(nèi)部已有處理,不會(huì)出現(xiàn)占用過高死掉的情況
        while ($worker->work()) {
            if ($worker->returnCode() !== GEARMAN_SUCCESS) {
                echo "error" . PHP_EOL;
            }
        }
    }

    public function parseJson($str)
    {
        $data = json_decode($str, true);
        return (json_last_error() == JSON_ERROR_NONE) ? $data : false;
    }

    public function writeExcel($path, $filename, $data, $header)
    {
        if ($this->mkDir($path)) {
            $data = $this->assembleData($data);
            $rs = $this->generateExcel($header, $data);
            file_put_contents(rtrim($path, "/") . "/" . $filename, $rs);
        } else {
            echo "目錄不存在,寫文件錯(cuò)誤!";
        }
        return;

    }

    public function getData($dataApi, $countApi, $api, $params)
    {
        $start = microtime(true);
        $count = AppHelper::getData($api . $countApi . "?" . http_build_query($params));
        echo $api . $countApi . "?" . http_build_query($params).PHP_EOL;
        echo "總條數(shù):" . $count . PHP_EOL;
        $params["perpage"] = 500;
        $times = ceil($count / $params["perpage"]);
        $data = [];
        if ($count > 0) {
            for ($i = 0; $i < $times; $i++) {
                $params["page"] = $i + 1;
                $rs = AppHelper::getData($api . $dataApi . "?" . http_build_query($params));
                $data = array_merge($data, $rs);
            }
        }
        $end = microtime(true);
        echo "花費(fèi)時(shí)間:" . ($end - $start) . PHP_EOL;
        return $data;
    }

    public function generateExcel($header, array &$data)
    {

        $rs = "";
        if (is_string($header)) {
            $header = explode(",", $header);
        }
        foreach ($header as $v) {
            $rs .= "";
        }
        $rs .= "";
        foreach ($data as $coll) {
            $rs .= "";
            foreach ($coll as $v) {
                if (AppHelper::isDouble($v)) {
                    $rs .= "";
                } else {
                    $rs .= "";
                }
            }
            $rs .= "";
        }

        $rs .= "
" . $v . "
" . $v . "" . $v . "
"; unset($data); return $rs; } public function assembleData($rs) { $users = []; if ($rs) { $uids = array_column($rs, "uid"); $us = Yii::$app->get("db")->createCommand("select uid,gender,adminflag,mobile,type from {{%user}} where uid in (" . implode(",", $uids) . ")")->queryAll(); if ($us && is_array($us)) { foreach ($us as $u) { $users[$u["uid"]] = $u; } } } $content = []; foreach ($rs as $k => $v) { $data = AppHelper::decodeJson($v["data"], true); $status = "已刪除"; if ($v["status"] == 0) { $status = "已關(guān)閉"; } elseif ($v["status"] == 1) { $status = "下單"; } elseif ($v["status"] == 2) { $status = "付款確認(rèn)中"; } elseif ($v["status"] == 3) { $status = "已付款"; } elseif ($v["status"] == 4) { $status = "已發(fā)貨"; } elseif ($v["status"] == 5) { $status = "已確認(rèn)收貨"; } elseif ($v["status"] == 6) { $status = "已評(píng)價(jià)"; } elseif ($v["status"] == 7) { $status = "支付價(jià)格與訂單價(jià)格不一致"; } $refund = "未申請退款"; if (isset($v["refund"])) { if ($v["refund"] == 5) { $refund = "退款已到賬"; } elseif ($v["refund"] == 4) { $refund = "賣家已確認(rèn)但需人工處理"; } elseif ($v["refund"] == 3) { $refund = "同意退款"; } elseif ($v["refund"] == 2) { $refund = "拒絕退款"; } elseif ($v["refund"] == 1) { $refund = "退款申請中"; } elseif ($v["refund"] == 0) { $refund = "未申請"; } elseif ($v["refund"] == 6) { $refund = "退貨退款申請中"; } elseif ($v["refund"] == 7) { $refund = "同意退貨申請"; } elseif ($v["refund"] == 8) { $refund = "拒絕退貨申請"; } elseif ($v["refund"] == 9) { $refund = "買家退貨已發(fā)出"; } elseif ($v["refund"] == 10) { $refund = "賣家確認(rèn)收貨"; } elseif ($v["refund"] == 11) { $refund = "收到貨拒絕退款"; } elseif ($v["refund"] == 12) { $refund = "退貨退款已到賬"; } } $gender = "未知"; if (isset($users[$v["uid"]]) && $users[$v["uid"]]["gender"] == 1) { $gender = "男"; } else if (isset($users[$v["uid"]]) && $users[$v["uid"]]["gender"] == 2) { $gender = "女"; } $type = "普通用戶"; if (isset($users[$v["uid"]]) && $users[$v["uid"]]["adminflag"] == 3) { $type = "審核中的匠人"; } else if (isset($users[$v["uid"]]) && $users[$v["uid"]]["adminflag"] == 2) { $type = "種子用戶"; } else if (isset($users[$v["uid"]]) && $users[$v["uid"]]["adminflag"] == 1) { $type = "管理員"; } $itype = "未設(shè)置/現(xiàn)貨"; if (isset($data["type"])) { if ($data["type"] == 1) { $itype = "現(xiàn)貨"; } else if ($data["type"] == 2) { $itype = "定制"; } else { $itype = "拍賣"; } } $utype = isset($users[$v["uid"]]["type"]) && $users[$v["uid"]]["type"] == 4 ? "微信購買注冊" : "APP內(nèi)注冊"; $otype = !$v["otype"] ? "APP內(nèi)購買" : "微信購買"; $paytype = !$v["prepaytype"] ? "APP內(nèi)付款" : "微信付款"; $snapshot = AppHelper::getData(Yii::$app->params["imageServer"] . $v["snapshot"]); $content[] = [date("Y/m/d H:i:s", floor($v["createtm"] / 1000)), $v["ooid"], isset($snapshot["item"]["pid"]) ? $snapshot["item"]["pid"] : "", $v["iid"], $data["title"], $itype, (isset($v["parentCategory"]) ? $v["parentCategory"] . "/" : "") . $v["category"], $v["craftsman"], $v["suid"], $v["quantity"], $v["username"], $utype, $v["uid"], $data["address"], $status, $refund, $data["price"], $v["realpay"], $otype, $paytype, isset($users[$v["uid"]]["mobile"]) ? $users[$v["uid"]]["mobile"] : "未知", $gender, $type]; } return $content; } public function mkDir($path) { if (is_dir($path)) { echo "目錄" . $path . "已存在!"; return true; } else { $res = mkdir($path, 0777, true); echo $res ? "目錄創(chuàng)建成功" : "目錄創(chuàng)建失敗"; return $res; } } } }

Gearman的Client端的代碼:

addServer("127.0.0.1", 4730);

    $client->setCompleteCallback(completeCallBack);
    $result2 = $client->doBackground("export", $str);//異步進(jìn)行,只返回處理句柄。

//        $result1 = $client->do("export", "do");//do是同步進(jìn)行,進(jìn)行處理并返回處理結(jié)果。
//        $result3 = $client->addTask("export", "addTask");//添加任務(wù)到隊(duì)列,同步進(jìn)行?通過添加task可以設(shè)置回調(diào)函數(shù)。
//        $result4 = $client->addTaskBackground("export", "addTaskBackground");//添加后臺(tái)任務(wù)到隊(duì)列,異步進(jìn)行?

    $client->runTasks();//運(yùn)行隊(duì)列中的任務(wù),只是do系列不需要runTask()
    return $result2;

}
//綁定回調(diào)函數(shù),只對(duì)addTask有效
function completeCallBack($task)
{
    echo "CompleteCallback!handle result:".$task->data()."
"; }

ps:要運(yùn)行上面的代碼,需要在服務(wù)器或者本地安裝Gearman服務(wù),并且需要安裝php_gearman擴(kuò)展,安裝教程自行搜索。

如果你的業(yè)務(wù)邏輯不復(fù)雜,到此可以導(dǎo)出幾萬條數(shù)據(jù)綽綽有余了,然而,我的問題并沒有因此而解決,上司說,不想用Gearman隊(duì)列處理,最好還是java處理。嗯,沒關(guān)系,我喜歡這種在技術(shù)中跳來跳去的解決問題,既然不滿足上司的需求,那就另行方案。

MqRocket+HDFS+POI

說明:這里用到的java項(xiàng)目都是基于spring+dubbo/dubbox的項(xiàng)目。所用到的配置或者注解均在spring的相關(guān)配置和注解范疇,除了mapper的配置和注解。

三個(gè)項(xiàng)目:

mq項(xiàng)目:提供rest服務(wù),發(fā)送消息(@rxl)

biz項(xiàng)目:提供dubbo、restfull接口,處理業(yè)務(wù)(@lee)

data項(xiàng)目:處理數(shù)據(jù)導(dǎo)出

如上,三個(gè)項(xiàng)目分別是不同的工程師所寫,我們不關(guān)心怎么實(shí)現(xiàn)的,只需知道我們能使用每個(gè)功能即可。

mq提供的restfull接口:
@Path("/message")
@Produces({ContentType.APPLICATION_JSON_UTF_8})
@Component("sendMessageService")
public class SendMessageImpl implements SendMessageService{

    @Resource
    public IProducer producer;

    @PUT
    @Path("send")
    @Consumes({MediaType.APPLICATION_JSON})
    @Override
    public void sendMessage(Message message) {
        System.out.println("message" + message.getMessage());
        producer.send(message.getTopic(),message.getKey(),message.getMessage());
    }
}

這樣我們在php后臺(tái)通過put方式,調(diào)用該接口,將需要處理的數(shù)據(jù)發(fā)送給導(dǎo)出處理服務(wù)端。發(fā)送put請求可以使用curl強(qiáng)大的request功能。

 curl_setopt($curl, CURLOPT_CUSTOMREQUEST, "PUT");

假如mq提供的rest接口是:http://localhost:8018/mq/message/send,我們需要傳遞一個(gè)json字符串,該字符串原型是一個(gè)關(guān)聯(lián)數(shù)組,數(shù)組的key分別為“topic”、“key”和“message”,topic是消息的主題,需要指定的mq主題去消費(fèi),key是消息的key,該topic下面會(huì)有很多key,因此,我們的消費(fèi)方即數(shù)據(jù)導(dǎo)出方需要根據(jù)key做判斷處理。message里面就是具體的一下參數(shù),比如需要導(dǎo)出哪些字段,比如文件上傳服務(wù)器地址等等信息。

$message = [
    "topic" => "order_export",
    "key" => "order_tag_" . $orderNo,
    "message" => [
        "params" => [
            ...
        ],
        "headers" => [
            ...
        ],
        "options" => [
            ...
        ],
    ],

];

完整的接口請求:

http://localhost:8018/mq/message/send?{"topic":"order_export","key":"order_tag_","message":{"params":[],"header":[],"options":[]}}

poi工具類封裝

Java的Excel API很多,唯獨(dú)Apache POI這款使用最方便最靈活(或許其他的沒有使用過)。

HSSF is the POI Project"s pure Java implementation of the Excel "97(-2007) file format. XSSF is the POI Project"s pure Java implementation of the Excel 2007 OOXML (.xlsx) file format.

HSSF and XSSF provides ways to read spreadsheets create, modify, read and write XLS spreadsheets. They provide:

low level structures for those with special needs

an eventmodel api for efficient read-only access

a full usermodel api for creating, reading and modifying XLS files

在gradle引入poi包:

// java excel api
compile "org.apache.poi:poi:3.10.1"
compile "org.apache.poi:poi-ooxml:3.9"
package cn.test.web.utils;

import cn.test.util.Utils;
import org.apache.commons.io.FilenameUtils;
import org.apache.poi.hssf.record.crypto.Biff8EncryptionKey;
import org.apache.poi.hssf.usermodel.HSSFFont;
import org.apache.poi.hssf.usermodel.HSSFFooter;
import org.apache.poi.hssf.usermodel.HSSFHeader;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
import org.apache.poi.poifs.filesystem.POIFSFileSystem;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.CellStyle;
import org.apache.poi.ss.usermodel.Font;
import org.apache.poi.ss.usermodel.Footer;
import org.apache.poi.ss.usermodel.Header;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.ss.usermodel.WorkbookFactory;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Properties;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/11
 * Time 下午5:02
 */
public class POIUtils {
    private static final short HEADER_FONT_SIZE = 16; // 大綱字體
    private static final short FONT_HEIGHT_IN_POINTS = 14; // 行首字體

    public static Workbook createWorkbook(String file) {
        String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
        Workbook wb = null;
        switch (ext) {
            case "xls":
                wb = createHSSFWorkbook();
                break;
            case "xlsx":
                wb = createXSSFWorkbook();
                break;
            default:
                wb = createHSSFWorkbook();
        }
        return wb;
    }

    public static Workbook createWorkbookByIS(String file, InputStream inputStream) {
        String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
        Workbook wb = null;
        try {
            switch (ext) {
                case "xls":
                    wb = new HSSFWorkbook(inputStream);
                    break;
                case "xlsx":
                    wb = new XSSFWorkbook(inputStream);
                    break;
                default:
                    wb = new HSSFWorkbook(inputStream);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return wb;
    }

    public static Workbook writeFile(Workbook wb, String file) {
        if (wb == null || Utils.isEmpty(file)) {
            return null;
        }
        FileOutputStream out = null;
        try {
            out = new FileOutputStream(file);
            wb.write(out);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (out != null) {
                try {
                    out.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return wb;
    }

    public static Workbook createHSSFWorkbook() {
        //生成Workbook
        HSSFWorkbook wb = new HSSFWorkbook();
        //添加Worksheet(不添加sheet時(shí)生成的xls文件打開時(shí)會(huì)報(bào)錯(cuò))
        @SuppressWarnings("unused")
        Sheet sheet = wb.createSheet();
        return wb;
    }

    public static Workbook createXSSFWorkbook() {
        XSSFWorkbook wb = new XSSFWorkbook();
        @SuppressWarnings("unused")
        Sheet sheet = wb.createSheet();
        return wb;
    }

    public static Workbook openWorkbook(String file) {
        FileInputStream in = null;
        Workbook wb = null;

        try {
            in = new FileInputStream(file);
            wb = WorkbookFactory.create(in);
        } catch (InvalidFormatException | IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (in != null) {
                    in.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return wb;
    }

    public static Workbook openEncryptedWorkbook(String file, String password) {
        FileInputStream input = null;
        BufferedInputStream binput = null;
        POIFSFileSystem poifs = null;
        Workbook wb = null;
        try {
            input = new FileInputStream(file);
            binput = new BufferedInputStream(input);
            poifs = new POIFSFileSystem(binput);
            Biff8EncryptionKey.setCurrentUserPassword(password);
            String ext = FilenameUtils.getExtension(CommonUtils.getFileName(file));
            switch (ext) {
                case "xls":
                    wb = new HSSFWorkbook(poifs);
                    break;
                case "xlsx":
                    wb = new XSSFWorkbook(input);
                    break;
                default:
                    wb = new HSSFWorkbook(poifs);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return wb;
    }

    /**
     * 追加一個(gè)sheet,如果wb為空且isNew為true,創(chuàng)建一個(gè)wb
     *
     * @param wb
     * @param isNew
     * @param type  創(chuàng)建wb類型,isNew為true時(shí)有效 1:xls,2:xlsx
     * @return
     */
    public static Workbook appendSheet(Workbook wb, boolean isNew, int type) {
        if (wb != null) {
            Sheet sheet = wb.createSheet();
        } else if (isNew) {
            if (type == 1) {
                wb = new HSSFWorkbook();
                wb.createSheet();
            } else {
                wb = new XSSFWorkbook();
                wb.createSheet();
            }
        }
        return wb;
    }


    public static Workbook setSheetName(Workbook wb, int index, String sheetName) {
        if (wb != null && wb.getSheetAt(index) != null) {
            wb.setSheetName(index, sheetName);
        }
        return wb;
    }

    public static Workbook removeSheet(Workbook wb, int index) {
        if (wb != null && wb.getSheetAt(index) != null) {
            wb.removeSheetAt(index);
        }
        return wb;
    }

    public static Workbook insert(Workbook wb, String sheetName, int row, int start,
                                  List columns) {
        if (row == 0 || wb == null) return wb;
        for (int i = start; i < (row + start); i++) {
            Row rows = wb.getSheet(sheetName).createRow(i);
            if (columns != null && columns.size() > 0) {
                for (int j = 0; j < columns.size(); j++) {
                    Cell ceil = rows.createCell(j);
                    ceil.setCellValue(String.valueOf(columns.get(j)));
                }
            }
        }
        return wb;
    }

    /**
     * 設(shè)置excel頭部
     *
     * @param wb
     * @param sheetName
     * @param columns   比如:["國家","活動(dòng)類型","年份"]
     * @return
     */
    public static Workbook setHeader(Workbook wb, String sheetName, List columns) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        return setHeaderStyle(insert(wb, sheetName, 1, 0, columns), sheetName);

    }

    /**
     * 插入數(shù)據(jù)
     *
     * @param wb        Workbook
     * @param sheetName sheetName,默認(rèn)為第一個(gè)sheet
     * @param start     開始行數(shù)
     * @param data      數(shù)據(jù),List嵌套List ,比如:[["中國","奧運(yùn)會(huì)",2008],["倫敦","奧運(yùn)會(huì)",2012]]
     * @return
     */
    public static Workbook setData(Workbook wb, String sheetName, int start,
                                   List data) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        if (data != null || data.size() > 0) {
            if (data instanceof List) {
                int s = start;
                for (Object columns : data) {
                    insert(wb, sheetName, data.size() - (s - 1), s, (List) columns);
                    s++;
                }
            }
        }
        return wb;
    }

    /**
     * 移除某一行
     *
     * @param wb
     * @param sheetName sheet name
     * @param row       行號(hào)
     * @return
     */
    public static Workbook delRow(Workbook wb, String sheetName, int row) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        Row r = wb.getSheet(sheetName).getRow(row);
        wb.getSheet(sheetName).removeRow(r);
        return wb;
    }

    /**
     * 移動(dòng)行
     *
     * @param wb
     * @param sheetName
     * @param start     開始行
     * @param end       結(jié)束行
     * @param step      移動(dòng)到那一行后(前) ,負(fù)數(shù)表示向前移動(dòng)
     *                  moveRow(wb,null,2,3,5); 把第2和3行移到第5行之后
     *                  moveRow(wb,null,2,3,-1); 把第3行和第4行往上移動(dòng)1行
     * @return
     */
    public static Workbook moveRow(Workbook wb, String sheetName, int start, int end, int step) {
        if (wb == null) return null;
        if (sheetName == null) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        wb.getSheet(sheetName).shiftRows(start, end, step);
        return wb;
    }

    public static Workbook setHeaderStyle(Workbook wb, String sheetName) {
        Font font = wb.createFont();
        CellStyle style = wb.createCellStyle();
        font.setBoldweight(HSSFFont.BOLDWEIGHT_BOLD);
        font.setFontHeightInPoints(FONT_HEIGHT_IN_POINTS);
        font.setFontName("黑體");
        style.setFont(font);
        if (Utils.isEmpty(sheetName)) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        int row = wb.getSheet(sheetName).getFirstRowNum();
        int cell = wb.getSheet(sheetName).getRow(row).getLastCellNum();
        for (int i = 0; i < cell; i++) {
            wb.getSheet(sheetName).getRow(row).getCell(i).setCellStyle(style);
        }
        return wb;
    }

    public static Workbook setHeaderOutline(Workbook wb, String sheetName, String title) {
        if (wb == null) return null;
        if (Utils.isEmpty(sheetName)) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        Header header = wb.getSheet(sheetName).getHeader();
        header.setLeft(HSSFHeader.startUnderline() +
                HSSFHeader.font("宋體", "Italic") +
                "打雞血的口號(hào)!" +  // 比如:愛我中華
                HSSFHeader.endUnderline());
        header.setCenter(HSSFHeader.fontSize(HEADER_FONT_SIZE) +
                HSSFHeader.startDoubleUnderline() +
                HSSFHeader.startBold() +
                title +
                HSSFHeader.endBold() +
                HSSFHeader.endDoubleUnderline());
        header.setRight("時(shí)間:" + HSSFHeader.date() + " " + HSSFHeader.time());
        return wb;
    }

    public static Workbook setFooter(Workbook wb, String sheetName, String copyright) {
        if (wb == null) return null;
        if (Utils.isEmpty(sheetName)) {
            sheetName = wb.getSheetAt(0).getSheetName();
        }
        Footer footer = wb.getSheet(sheetName).getFooter();
        if (Utils.isEmpty(copyright)) {
            copyright = "中華人民共和國"; // 版權(quán)信息,自己公司的名字或者app的名字
        }
        footer.setLeft("Copyright @ " + copyright);
        footer.setCenter("Page:" + HSSFFooter.page() + " / " + HSSFFooter.numPages());
        footer.setRight("File:" + HSSFFooter.file());
        return wb;
    }

    public static String create(String sheetNm, String file, List header, List data, String title, String copyright) {
        Workbook wb = createWorkbook(file);
        if (Utils.isEmpty(sheetNm)) {
            sheetNm = wb.getSheetAt(0).getSheetName();
        }
        setHeaderOutline(wb, sheetNm, title);
        setHeader(wb, sheetNm, header);
        setData(wb, sheetNm, 1, data);
        setFooter(wb, sheetNm, copyright);
        writeFile(wb, file);
        if (wb != null) {
            return file;
        }
        return null;
    }

    public static String getSystemFileCharset() {
        Properties pro = System.getProperties();
        return pro.getProperty("file.encoding");
    }
    // TODO 后面增加其他設(shè)置

}
HDFS工具類封裝

Hadoop分布式文件系統(tǒng)(HDFS)被設(shè)計(jì)成適合運(yùn)行在通用硬件(commodity hardware)上的分布式文件系統(tǒng)。它和現(xiàn)有的分布式文件系統(tǒng)有很多共同點(diǎn)。但同時(shí),它和其他的分布式文件系統(tǒng)的區(qū)別也是很明顯的。HDFS是一個(gè)高度容錯(cuò)性的系統(tǒng),適合部署在廉價(jià)的機(jī)器上。HDFS能提供高吞吐量的數(shù)據(jù)訪問,非常適合大規(guī)模數(shù)據(jù)集上的應(yīng)用。HDFS放寬了一部分POSIX約束,來實(shí)現(xiàn)流式讀取文件系統(tǒng)數(shù)據(jù)的目的。HDFS在最開始是作為Apache Nutch搜索引擎項(xiàng)目的基礎(chǔ)架構(gòu)而開發(fā)的。HDFS是Apache Hadoop Core項(xiàng)目的一部分。

HDFS有著高容錯(cuò)性(fault-tolerant)的特點(diǎn),并且設(shè)計(jì)用來部署在低廉的(low-cost)硬件上。而且它提供高吞吐量(high throughput)來訪問應(yīng)用程序的數(shù)據(jù),適合那些有著超大數(shù)據(jù)集(large data set)的應(yīng)用程序。HDFS放寬了(relax)POSIX的要求(requirements)這樣可以實(shí)現(xiàn)流的形式訪問(streaming access)文件系統(tǒng)中的數(shù)據(jù)。

在gradle中引入hdfs:

    // jersey
    compile "com.sun.jersey:jersey-core:1.19.1"
    compile "com.sun.jersey:jersey-server:1.19.1"
    compile "com.sun.jersey:jersey-client:1.19.1"
    compile "com.sun.jersey:jersey-json:1.19.1"

    // hadoop
    compile ("org.apache.hadoop:hadoop-common:2.7.2") {
        exclude(module: "jersey")
        exclude(module: "contribs")
    }
    compile ("org.apache.hadoop:hadoop-hdfs:2.7.2") {
        exclude(module: "jersey")
        exclude(module: "contribs")
    }
    compile ("org.apache.hadoop:hadoop-client:2.7.2") {
        exclude(module: "jersey")
        exclude(module: "contribs")
    }`
package cn.test.web.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.poi.ss.usermodel.Workbook;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/11
 * Time 下午5:41
 */
public class HDFSUtils {
    private static FileSystem fs = null;

    public static FileSystem getFileSystem(Configuration conf) throws IOException,
            URISyntaxException {
        fs = FileSystem.get(conf);
        //fs = FileSystem.newInstance(conf);
        return fs;
    }

    /**
     * 判斷路徑是否存在
     *
     * @param conf
     * @param path
     * @return
     * @throws IOException
     */
    public static boolean exits(Configuration conf, String path) throws IOException,
            URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        return fs.exists(new Path(path));
    }

    /**
     * 創(chuàng)建文件
     *
     * @param conf
     * @param filePath
     * @param contents
     * @throws IOException
     */
    public static void createFile(Configuration conf, String filePath, byte[] contents)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path path = new Path(filePath);
        FSDataOutputStream outputStream = fs.create(path);
        outputStream.write(contents, 0, contents.length);
        outputStream.hflush();
        outputStream.close();
        fs.close();
    }

    /**
     * 創(chuàng)建文件
     *
     * @param conf
     * @param filePath
     * @param fileContent
     * @throws IOException
     */
    public static void createFile(Configuration conf, String fileContent, String filePath)
            throws IOException, URISyntaxException {
        createFile(conf, filePath, fileContent.getBytes());
    }

    /**
     * 上傳文件
     *
     * @param conf
     * @param localFilePath
     * @param remoteFilePath
     * @throws IOException
     */
    public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path localPath = new Path(localFilePath);
        Path remotePath = new Path(remoteFilePath);
        fs.copyFromLocalFile(true, true, localPath, remotePath);
        fs.close();
    }

    /**
     * 刪除目錄或文件
     *
     * @param conf
     * @param remoteFilePath
     * @param recursive
     * @return
     * @throws IOException
     */
    public static boolean deleteFile(Configuration conf, String remoteFilePath, boolean recursive)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        boolean result = fs.delete(new Path(remoteFilePath), recursive);
        fs.close();
        return result;
    }

    /**
     * 刪除目錄或文件(如果有子目錄,則級(jí)聯(lián)刪除)
     *
     * @param conf
     * @param remoteFilePath
     * @return
     * @throws IOException
     */
    public static boolean deleteFile(Configuration conf, String remoteFilePath)
            throws IOException, URISyntaxException {
        return deleteFile(conf, remoteFilePath, true);
    }

    /**
     * 文件重命名
     *
     * @param conf
     * @param oldFileName
     * @param newFileName
     * @return
     * @throws IOException
     */
    public static boolean renameFile(Configuration conf, String oldFileName, String newFileName)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path oldPath = new Path(oldFileName);
        Path newPath = new Path(newFileName);
        boolean result = fs.rename(oldPath, newPath);
        fs.close();
        return result;
    }

    /**
     * 創(chuàng)建目錄
     *
     * @param conf
     * @param dirName
     * @return
     * @throws IOException
     */
    public static boolean createDirectory(Configuration conf, String dirName)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path dir = new Path(dirName);
        boolean result = fs.mkdirs(dir);
        fs.close();
        return result;
    }

    /**
     * 列出指定路徑下的所有文件(不包含目錄)
     *
     * @param fs
     * @param basePath
     * @param recursive
     */
    public static RemoteIterator listFiles(FileSystem fs, String basePath, boolean recursive)
            throws IOException {

        RemoteIterator fileStatusRemoteIterator = fs.listFiles(new Path(basePath), recursive);

        return fileStatusRemoteIterator;
    }

    /**
     * 列出指定路徑下的文件(非遞歸)
     *
     * @param conf
     * @param basePath
     * @return
     * @throws IOException
     */
    public static RemoteIterator listFiles(Configuration conf, String basePath)
            throws IOException, URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        RemoteIterator remoteIterator = fs.listFiles(new Path(basePath), false);
        fs.close();
        return remoteIterator;
    }

    /**
     * 列出指定目錄下的文件子目錄信息(非遞歸)
     *
     * @param conf
     * @param dirPath
     * @return
     * @throws IOException
     */
    public static FileStatus[] listStatus(Configuration conf, String dirPath) throws IOException,
            URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        FileStatus[] fileStatuses = fs.listStatus(new Path(dirPath));
        fs.close();
        return fileStatuses;
    }


    /**
     * 讀取文件內(nèi)容并寫入outputStream中
     *
     * @param conf 配置
     * @param filePath 文件路徑
     * @param os 輸出流
     * @return
     * @throws IOException
     */
    public static void readFile(Configuration conf, String filePath, OutputStream os) throws IOException,
            URISyntaxException {
        FileSystem fs = getFileSystem(conf);
        Path path = new Path(filePath);
        try (FSDataInputStream inputStream = fs.open(path)) {
            Workbook wb = POIUtils.createWorkbookByIS(filePath, inputStream);
            wb.write(os);
            inputStream.close();
        } finally {
            fs.close();
        }
    }

    /**
     * 讀取文件內(nèi)容并返回
     * @param conf
     * @param filePath
     * @return
     * @throws IOException
     * @throws URISyntaxException
     */
    public static String readFile(Configuration conf, String filePath) throws IOException,
            URISyntaxException {
        String fileContent = null;
        FileSystem fs = getFileSystem(conf);
        Path path = new Path(filePath);
        InputStream inputStream = null;
        ByteArrayOutputStream outputStream = null;
        try {
            inputStream = fs.open(path);
            outputStream = new ByteArrayOutputStream(inputStream.available());
            IOUtils.copyBytes(inputStream, outputStream, conf);
            byte[] lens = outputStream.toByteArray();
            fileContent = new String(lens, "UTF-8");
        } finally {
            IOUtils.closeStream(inputStream);
            IOUtils.closeStream(outputStream);
            fs.close();
        }
        return fileContent;
    }
}

對(duì)于hdfs我多帶帶有謝了兩個(gè)類,一個(gè)是HDFSFileUploader,一個(gè)是Configuration。如類名,前者用于文件上傳,后者用于hdfs的配置。

HDFSFileUploader
package cn.test.web.utils.hadoop;

import cn.test.common.log.Log;
import cn.test.common.log.LogFactory;
import cn.test.common.util.Utils;
import cn.test.web.utils.HDFSUtils;
import org.apache.commons.lang.NullArgumentException;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.UUID;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/11
 * Time 下午5:42
 */
public class HDFSFileUploader {
    public static final byte FROM_LOCAL_COPY = 1; // 從本地上傳文件
    public static final byte FROM_CONTENT_WRITE = 2; // 讀取字符串或字節(jié),生成文件

    private static final Log LOGGER = LogFactory.getLog(HDFSFileUploader.class);
    private static final String HDFS_SCHEMA = "hdfs://";
    private static final String SEPARATOR = "/";
    private static final String SUFFIX_PREFIX = ".";

    private static final int BUFFER_SIZE = 1024;
    private static final Configuration CONF = new Configuration();


    /**
     * 上傳二進(jìn)制文件,使用默認(rèn)配置的域名,隨機(jī)生成文件名
     *
     * @param path
     * @param suffix
     * @param contents
     * @return
     */
    public static String upload(String path, String suffix, byte[] contents) {
        return upload(null, path, suffix, contents);
    }

    /**
     * 上傳二進(jìn)制文件,隨機(jī)生成文件名
     *
     * @param domain
     * @param path
     * @param suffix
     * @param contents
     * @return
     */
    public static String upload(String domain, String path, String suffix, byte[] contents) {
        return upload(domain, path, null, suffix, contents);
    }

    /**
     * 上傳二進(jìn)制文件,指定文件名,只能通過流上傳
     *
     * @param domain
     * @param path
     * @param filename
     * @param suffix
     * @param content
     * @return
     */
    public static String upload(String domain, String path, String filename, String suffix,
                                final byte[] content) {
        return upload(domain, path, filename, suffix, new String(content), FROM_CONTENT_WRITE);
    }

    /**
     * 上傳文件,默認(rèn)域名和隨機(jī)文件名
     *
     * @param path
     * @param suffix
     * @param src
     * @return
     */
    public static String upload(String path, String suffix, String src, byte fromLocal) {
        return upload(null, path, suffix, src, fromLocal);
    }

    /**
     * 上傳文件到指定域名的指定目錄,文件名隨機(jī)生成
     *
     * @param domain 域名,比如 10.25.126.28:9000
     * @param path   文件路徑,比如 /usr/local/com.hd.test/2016-08-08/
     * @param suffix 文件后綴,比如 .xsl,xsl
     * @param src    文件內(nèi)容,字符串 || 本地文件路徑
     * @return String 完整的文件名
     */
    public static String upload(String domain, String path, String suffix, String src, byte
            fromLocal) {
        return upload(domain, path, null, suffix, src, fromLocal);
    }

    /**
     * 上傳文件,指定了域名,路徑,文件名,后綴
     *
     * @param domain   域名
     * @param path     路徑
     * @param filename 文件名
     * @param suffix   后綴
     * @param src      內(nèi)容 || 本地路徑
     * @return
     */
    public static String upload(String domain, String path, String filename, String suffix, String
            src, byte fromLocal) {
        String filePath = getRealAddr(domain, path, suffix, filename);
        System.out.println(filePath);
        try {
            switch (fromLocal) {
                case FROM_LOCAL_COPY:
                    HDFSUtils.copyFromLocalFile(CONF, src, filePath);
                    break;
                case FROM_CONTENT_WRITE:
                    HDFSUtils.createFile(CONF, src, filePath);
                    break;
            }
            return filePath;
        } catch (IOException | URISyntaxException e) {
            LOGGER.warn("上傳文件失敗:{}",e.getMessage());
        }
        return null;
    }

    /**
     * 文件完整的路徑
     *
     * @param domain   域名
     * @param path     目錄路徑
     * @param suffix   后綴
     * @param filename 文件名
     * @return
     */
    private static String getRealAddr(String domain, String path, String suffix, String filename) {
        if (!Utils.isEmpty(domain) && !domain.startsWith(HDFS_SCHEMA)) {
            domain = HDFS_SCHEMA + domain;
        } else {
            domain = "";
        }
        path = getPath(path);
        filename = getFilename(filename, suffix);
        return String.format("%s%s%s", domain, path, filename);

    }

    /**
     * 文件路徑
     *
     * @param path
     * @return
     */
    private static String getPath(String path) {
        if (Utils.isEmpty(path)) {
            throw new NullArgumentException("path id null");
        }
        if (!path.startsWith(SEPARATOR)) {
            path = SEPARATOR + path;
        }
        if (!path.endsWith(SEPARATOR)) {
            path = path + SEPARATOR;
        }
        return path;
    }

    /**
     * 生成文件名
     *
     * @param filename
     * @param suffix
     * @return
     */
    private static String getFilename(String filename, String suffix) {
        if (Utils.isEmpty(filename)) {
            filename = generateFilename();
        }
        if (!Utils.isEmpty(suffix)) {
            filename = suffix.equals(SEPARATOR) ? filename : (filename.endsWith(suffix) ?
                    filename : ((filename.endsWith(SUFFIX_PREFIX)
                    || suffix.startsWith(SUFFIX_PREFIX)) ? filename + suffix
                    : filename + SUFFIX_PREFIX + suffix));
        }
        return filename;
    }

    /**
     * 生成文件名
     *
     * @return
     */
    private static String generateFilename() {
        return getUuid(false);
    }

    /**
     * 生成UUID
     *
     * @param isNeedHyphen
     * @return
     */
    public static String getUuid(boolean isNeedHyphen) {
        UUID uuid = UUID.randomUUID();
        String str = uuid.toString();
        if (isNeedHyphen) {
            str = str.replaceAll("-", "");
        }
        return str;
    }

    public static void setConfResource(final Configuration config) {
        CONF.addResource(config);
    }
}

HDFSFileUploader中的一系列方法,用于上傳不同類型的文件,比如二進(jìn)制文件,字符串等,還有hdfs的copy本地文件以及文件名uuid生成等方法。

Configuration
package cn.test.web.utils.hadoop;

import cn.test.web.utils.CommonUtils;
import org.apache.commons.io.FilenameUtils;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/9
 * Time 上午9:30
 * 建議使用方法:
 * 
 * 
 * 
 * classpath:/spring/core-site.xml
 * 
 * 
 * 
 * 在使用的地方直接注入hadoopConfig:
 *
 * @Resource private Configuration hadoopConfig;
 */
public class Configuration extends org.apache.hadoop.conf.Configuration {
    private Resource[] resources;

    public void setResources(List filenames) throws IOException {
        List resources = new ArrayList<>();
        if (filenames != null && filenames.size() > 0) {
            for (String filename : filenames) {
                filename = filename.trim();
                String realName = getFileName(filename);
                String ext = FilenameUtils.getExtension(realName);
                if (ext.equals("xml")) {
                    PathMatchingResourcePatternResolver pathMatchingResourcePatternResolver =
                            new PathMatchingResourcePatternResolver();
                    try {
                        Resource[] resourceList = pathMatchingResourcePatternResolver.getResources(filename);
                        Collections.addAll(resources, resourceList);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        for (Resource resource : resources) {
            this.addResource(resource.getURL());
        }
    }

    private String getFileName(String fileName) {
        return CommonUtils.getFileName(fileName);
    }
}

這個(gè)類很簡單,其實(shí)是集成了hadoop的org.apache.hadoop.conf.Configuration類,目的是為了在spring配置文件中,靈活的指定hadoop的配置文件,所用到的就是org.apache.hadoop.conf.Configuration的addResource(String name)方法,下面是在spring xml中的配置。

    
    
        
            
                classpath:META-INF/hadoop/*.xml
            
        
    
導(dǎo)出訂單處理(mq消費(fèi)端)
package cn.test.web.mq.consumer;
... // 很多依賴包

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/9
 * Time 下午2:14
 */
public class OrderExportHandler implements IMessageHandler {

    private static final Log LOGGER = LogFactory.getLog(OrderExportHandler.class);
    private static final int MUL_SEC = 1000;
    private static final Gson GSON = new Gson();
    
    @Value("${image_server}") 
    private String imageServer;
    @Autowired
    private DataManager manager;
 
    @Override
    public void handle(final String key, final String message) {
        System.out.println("message" + message);
        Pattern p = Pattern.compile("-");
        String[] skey = p.split(key);
        if (skey.length < 3) {
            return;
        }
        int res = insert(skey[1], skey[0], skey[2]);
        LOGGER.debug("主鍵:{}", res);
        if (res > 0) {
            //插入數(shù)據(jù)成功,執(zhí)行導(dǎo)出數(shù)據(jù)邏輯
            Map data = manager.parseData(message);
            List header = null;
            List content = null;
            List orders = null;

            DataExportLog log = new DataExportLog();
            log.setDelid(res);
            log.setUid(Integer.valueOf(skey[2]));

            if (data.containsKey("params")) {
                LOGGER.debug("params:{}", data.get("params"));
                orders = manager.getOrders(data.get("params"));
                LOGGER.debug("導(dǎo)出數(shù)據(jù)的條數(shù):{}", orders.size());
            }
            if (orders == null || orders.size() == 0) {
                log.setStatus((byte) 4);
            } else if (data.containsKey("header") && (data.get("header") instanceof Map)) {
                Object obj = data.get("header");
                Map map = (obj instanceof Map) ?
                        manager.parseHeader((Map) obj) : null;

                if (map != null && map.size() > 0) {
                    if (map.containsKey("header")) {
                        header = getHeader(map.get("header"));
                    }
                    if (map.containsKey("key")) {
                        content = getContent(orders, map.get("key"));
                    }
                }
                // 調(diào)用hdfs 接口,上傳文件
                if (!Utils.isEmpty(header) || !Utils.isEmpty(content)) {
                    // 生成excel文件
                    String fName = getFilename(data);
                    String localFile = manager.writeExecelFile(fName, header, content, null, null);
                    String file = manager.copyFileFromLocal(skey[0], localFile);

                    if (Utils.isEmpty(localFile) || Utils.isEmpty(file)) {
                        log.setStatus((byte) 3);
                    } else {
                        log.setStatus((byte) 1);
                        log.setLink(file);
                    }
                    LOGGER.info("本地臨時(shí)文件:{}", localFile);
                    LOGGER.info("上傳到hadoop服務(wù)器中的文件:{}", file);
                }

            }
            update(log);
        }
    }
    
    // TODO 
    // 處理數(shù)據(jù),這里面會(huì)調(diào)用biz項(xiàng)目的dubbo接口
    // 具體的操作不在這里面寫
    
}

訂單導(dǎo)出邏輯都在上面的類,以及DataManager中進(jìn)行處理,期間獲取數(shù)據(jù)等接口則由biz項(xiàng)目的dubbo接口提供,具體業(yè)務(wù)邏輯在此不涉及。

下面會(huì)給出manager.writeExecelFile(fName, header, content, null, null);方法和manager.copyFileFromLocal(skey[0], localFile);方法的code:

public String writeExecelFile(String filename, List header, List datas, String title, String copyright) {
    SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd");
       String date = sd.format(new Date());
    if (Utils.isEmpty(filename)) {
        filename = HDFSFileUploader.getUuid(true) + this.ext;
    }
    String filePath = this.tmpDir + "/" + date + "/" + filename;
    filePath = filePath.replaceAll("http://", "/");
    File f = new File(CommonUtils.getFilePath(filePath));
    if (!f.exists() && !f.isDirectory()) {
        f.mkdir();
    }
    if (Utils.isEmpty(title)) {
        title = DEFAULT_TITLE;
    }
    if (Utils.isEmpty(copyright)) {
        copyright = this.copyright;
    }
    return POIUtils.create(null, filePath, header, datas, title, copyright);
}

writeExecelFile方法調(diào)用了poi的create方法,此時(shí)臨時(shí)文件已生成。
還有一點(diǎn)需要說一下,比如臨時(shí)路徑,上傳到hdfs的路徑,版權(quán)信息等最好是在配置文件中可配置的,這就依賴予spring的org.springframework.beans.factory.config.PropertyPlaceholderConfigurer類,他可以做到,我們只需要在代碼中這么寫并且在properties文件中寫入相應(yīng)的配置即可:

    @Value("${hdfs_upload_dir}")
    private String uploadDir;

    @Value("${file_tmp_dir}")
    private String tmpDir;

    @Value("${copyright}")
    private String copyright;

    @Value("${default_file_ext}")
    private String ext;

再看看copyFileFromLocal這個(gè)方法:

    /**
     * 寫hdfs文件
     *
     * @param type
     * @param file
     * @return
     */
    public String copyFileFromLocal(String type, String file) {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        String date = format.format(new Date());
        String path = this.uploadDir + type + "/" + date + "/";
        HDFSFileUploader.setConfResource(hadoopConfig);
        return HDFSFileUploader.upload(path, this.ext, file, HDFSFileUploader.FROM_LOCAL_COPY);
    }

這個(gè)方法中調(diào)用了HDFSFileUploader.upload的方法,即上面展示的一個(gè)封裝類中的方法。需要注意的是,這地方注入了hadoop的配置文件HDFSFileUploader.setConfResource(hadoopConfig);。而hadoop得Configuration這樣引入在DataMananager類中:

@Resource
private Configuration hadoopConfig;

到此,我們把生成的excel文件上傳到了hdfs的指定文件路徑??梢允褂胔adoop客戶端的命令查看:

hadoop fs -ls /cn/test/order/ (這里是上傳路徑)
訂單導(dǎo)出(下載)

訂單導(dǎo)出,這里由java后端直接提供rest接口,如果使用php的hdfs第三方包phdfs(github),用起來并不那么書順暢,編譯時(shí)報(bào)錯(cuò)。

好吧,看看這個(gè)接口是怎么寫的:

package cn.test.web.impl;

import cn.test.common.log.Log;
import cn.test.common.log.LogFactory;
import cn.test.util.Utils;
import cn.test.web.manager.DataManager;
import cn.test.web.service.DownloadService;
import cn.test.web.utils.CommonUtils;
import com.alibaba.dubbo.rpc.protocol.rest.support.ContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.net.URISyntaxException;

/**
 * Created with test-data
 * User zhoujunwen
 * Date 16/8/16
 * Time 下午5:21
 */
@Path("download")
@Component("downloads")
@Produces({ContentType.APPLICATION_JSON_UTF_8})
public class DownloadServiceImpl implements DownloadService {
    private static final Log LOGGER = LogFactory.getLog(DownloadServiceImpl.class);
    @Autowired
    private DataManager manager;
    @Override
    @GET
    @Path("order")
    public void down(@Context HttpServletResponse response, @QueryParam("url") String url,
                             @QueryParam("uid") Integer uid) {
        LOGGER.debug("下載地址:{}", url);
        if (Utils.isEmpty(url)) {
            return;
        }
        String filename = CommonUtils.getFileName(url);
        // 設(shè)置頭部
        response.setContentType(MediaType.APPLICATION_OCTET_STREAM);
        response.setContentType("application/vnd.ms-excel;charset=gb2312");
        response.setHeader("Content-Disposition", "attachment;filename=" + filename);
        try {
            // 讀取并寫入下載數(shù)據(jù)
            manager.readFile(url, response.getOutputStream());
            response.flushBuffer();
        } catch (IOException | URISyntaxException e) {
            LOGGER.error(e.getMessage());
        }
    }
}

PHP頁面只需要一個(gè)超級(jí)鏈接即可。優(yōu)化了一下,線上接口全部走內(nèi)網(wǎng)的,因此,在a標(biāo)簽中不可能直接把該接口的ip暴露出去,因此在nginx服務(wù)器做了代理配置,只需要訪問一個(gè)downloads/order?url=xxx&uid=xxx即可。

location /downloads/ {
    proxy_pass http://127.0.0.1:8086/presentation/download/;
}
踩過的坑 多線程獲取調(diào)用biz接口
public List getOrders(Object params) {
        OrderSearch search = null;
        if (params != null && (params instanceof Map)) {
            System.out.println("params:" + params);
            search = GSON.fromJson(GSON.toJson(params), OrderSearch.class);
            System.out.println("title:" + search.getTitle());
        } else {
            search = new OrderSearch();
        }
        int count = orderService.searchCount(search);
        int cycleTimes = (int) Math.ceil(count * 1.0 / TIMES_IN_SIGNEL_PROCESSOR);
        LOGGER.debug("數(shù)據(jù)總條數(shù)count:{},外部循壞執(zhí)行次數(shù):times:{}", count, cycleTimes);
        // 獲取所有并發(fā)任務(wù)的運(yùn)行結(jié)果
        List orders = new ArrayList<>();
        int page = 0;
        for (int j = 0; j < cycleTimes; j++) {
            int signel = (count > TIMES_IN_SIGNEL_PROCESSOR) ? TIMES_IN_SIGNEL_PROCESSOR : count;
            count = count - signel;
            int poolNum = (int) Math.ceil(signel * 1.0 / LIMIT);
            LOGGER.debug("線程池?cái)?shù)量:{}", poolNum);
            // 創(chuàng)建一個(gè)線程池
            ExecutorService pool = Executors.newFixedThreadPool(poolNum);
            // 創(chuàng)建多個(gè)有返回值的任務(wù)
            List list = new ArrayList();
            for (int i = 0; i < poolNum; i++) {
                Callable c = new OrderExportCallable(i + "", ++page, LIMIT, orderService, search);
                // 執(zhí)行任務(wù)并獲取Future對(duì)象
                Future f = pool.submit(c);
                list.add(f);
            }
            // 關(guān)閉線程池
            pool.shutdown();
            try {
                Thread.sleep(THREAD_SLEEP);
            } catch (InterruptedException e) {
                LOGGER.debug("線程休眠時(shí),引起中斷異常:{}", e.getMessage());
            }
            for (Future f : list) {
                // 從Future對(duì)象上獲取任務(wù)的返回值
                try {
                    orders.addAll((Collection) f.get());
                    LOGGER.debug(">>>線程:{}返回的數(shù)據(jù)條數(shù):{}", f.toString(),
                            ((Collection) f.get()).size());
                } catch (InterruptedException | ExecutionException e) {
                    LOGGER.warn("調(diào)用OrderService接口的search方法失敗:{}", e.getMessage());
                    return null;
                }
            }

        }

        re           
               
                                           
                       
                 

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/65066.html

相關(guān)文章

  • 基于haddopHDFSExcel開源POI導(dǎo)出數(shù)據(jù)報(bào)表()

    摘要:說明這里用到的項(xiàng)目都是基于的項(xiàng)目。但同時(shí),它和其他的分布式文件系統(tǒng)的區(qū)別也是很明顯的。能提供高吞吐量的數(shù)據(jù)訪問,非常適合大規(guī)模數(shù)據(jù)集上的應(yīng)用。放寬了一部分約束,來實(shí)現(xiàn)流式讀取文件系統(tǒng)數(shù)據(jù)的目的。是項(xiàng)目的一部分。 關(guān)鍵詞 Java、PHP、hdfs、mqrocket、excel、poi、報(bào)表 需求背景 在業(yè)務(wù)需求方面,每個(gè)企業(yè)或多或少都會(huì)有報(bào)表導(dǎo)出的作業(yè),量少則可是使用輸出流或者字符串的...

    Nekron 評(píng)論0 收藏0
  • 基于haddopHDFSExcel開源POI導(dǎo)出數(shù)據(jù)報(bào)表(二)

    摘要:接著上一篇基于的和開源庫導(dǎo)出大數(shù)據(jù)報(bào)表一的遺留的問題開始,這篇做優(yōu)化處理。這個(gè)錯(cuò)誤造成的直接問題是數(shù)據(jù)空白,因?yàn)橹粫?huì)執(zhí)行一次,第二次條件就為了。幾經(jīng)波折,終于知道,引起錯(cuò)誤的原因是包沖突,和包的沖突。 接著上一篇《基于haddop的HDFS和Excel開源庫POI導(dǎo)出大數(shù)據(jù)報(bào)表(一)》的遺留的問題開始,這篇做優(yōu)化處理。 優(yōu)化導(dǎo)出流程 在一開始的時(shí)候,當(dāng)我獲取到訂單的數(shù)量,遍歷訂單,獲取...

    WalkerXu 評(píng)論0 收藏0
  • POI使用及導(dǎo)出excel報(bào)表

    摘要:的使用及導(dǎo)出報(bào)表首先,了解是什么一基本概念是軟件基金會(huì)的開放源碼函式庫,提供給程序?qū)Ω袷綑n案讀和寫的功能。 POI的使用及導(dǎo)出excel報(bào)表 首先,了解poi是什么? 一、基本概念 ? Apache POI是Apache軟件基金會(huì)的開放源碼函式庫,POI提供API給Java程序?qū)icrosoft Office格式檔案讀和寫的功能。 二、基本結(jié)構(gòu) ? HSSF - 提供讀寫...

    Ilikewhite 評(píng)論0 收藏0
  • poi導(dǎo)出excel

    摘要:積分消費(fèi)明細(xì)對(duì)賬單其中,有四個(gè)參數(shù),分別是,,,。導(dǎo)出讀取數(shù)據(jù)庫的信息,轉(zhuǎn)成。 public void detailExport() { String sourceSystem = getPara(source_system); String dataDate = getPara(data_date); Integer pointsType = get...

    RayKr 評(píng)論0 收藏0
  • POI如何高效導(dǎo)出百萬級(jí)Excel數(shù)據(jù)?

    摘要:閱讀原文如何高效導(dǎo)出百萬級(jí)數(shù)據(jù)在一個(gè)具有統(tǒng)計(jì)功能的系統(tǒng)中,導(dǎo)出功能幾乎是一定的,如何導(dǎo)出導(dǎo)出的數(shù)據(jù)有多少如何高效的導(dǎo)出簡介什么是就不用介紹了,這里主要說明不同版本下每個(gè)下的行列限制。 閱讀原文:POI如何高效導(dǎo)出百萬級(jí)Excel數(shù)據(jù)? 在一個(gè)具有統(tǒng)計(jì)功能的系統(tǒng)中,導(dǎo)出excel功能幾乎是一定的,如何導(dǎo)出excel?導(dǎo)出的數(shù)據(jù)有多少?如何高效的導(dǎo)出? Excel簡介什么是excel就不用...

    lemanli 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<