成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Elasticsearch Java High Level REST Client(Bulk API

Profeel / 2573人閱讀

摘要:異步執(zhí)行還可以以異步方式執(zhí)行,以便客戶端可以直接返回,用戶需要指定如何通過(guò)將請(qǐng)求和偵聽器傳遞給異步塊方法來(lái)處理響應(yīng)或潛在故障要執(zhí)行的和在執(zhí)行完成時(shí)使用的。在每次執(zhí)行之前和之后,或者當(dāng)一個(gè)失敗時(shí),都會(huì)調(diào)用這個(gè)偵聽器。

Bulk API
Java High Level REST Client提供了Bulk處理器來(lái)幫助處理批量請(qǐng)求。
Bulk請(qǐng)求

BulkRequest可以使用一個(gè)請(qǐng)求執(zhí)行多個(gè)索引、更新和/或刪除操作。

它需要在批量請(qǐng)求中添加至少一個(gè)操作:

BulkRequest request = new BulkRequest(); 
request.add(new IndexRequest("posts").id("1")  
        .source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts").id("2")  
        .source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts").id("3")  
        .source(XContentType.JSON,"field", "baz"));

創(chuàng)建BulkRequest。

IndexRequest添加到Bulk請(qǐng)求。

Bulk API只支持JSONSMILE編碼的文檔,提供任何其他格式的文檔都會(huì)導(dǎo)致錯(cuò)誤。

不同的操作類型可以添加到同一個(gè)BulkRequest

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "3")); 
request.add(new UpdateRequest("posts", "2") 
        .doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4")  
        .source(XContentType.JSON,"field", "baz"));

BulkRequest添加DeleteRequest。

BulkRequest添加UpdateRequest。

使用JSON格式添加IndexRequest。

可選參數(shù)

可以選擇提供以下參數(shù):

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m");

作為TimeValue等待bulk請(qǐng)求執(zhí)行的超時(shí)。

作為String等待bulk請(qǐng)求執(zhí)行的超時(shí)。

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");

作為WriteRequest.RefreshPolicy實(shí)例的刷新策略。

作為String的刷新策略。

request.waitForActiveShards(2); 
request.waitForActiveShards(ActiveShardCount.ALL);

設(shè)置在繼續(xù)執(zhí)行索引/更新/刪除操作之前必須活動(dòng)的碎片副本的數(shù)量。

作為ActiveShardCount提供的碎片副本的數(shù)量:可以是ActiveShardCount.ALL、ActiveShardCount.ONEActiveShardCount.DEFAULT(默認(rèn))。

request.pipeline("pipelineId");

全局pipelineId用于所有子請(qǐng)求,除非在子請(qǐng)求上重寫。

request.routing("routingId");

全局routingId用于所有子請(qǐng)求,除非在子請(qǐng)求上重寫。

BulkRequest defaulted = new BulkRequest("posts");

在所有子請(qǐng)求上使用全局索引的bulk請(qǐng)求,除非在子請(qǐng)求上重寫,這個(gè)參數(shù)是@Nullable,并只能在創(chuàng)建BulkRequest時(shí)設(shè)置。

同步執(zhí)行

當(dāng)以以下方式執(zhí)行BulkRequest時(shí),客戶端等待BulkResponse返回,然后繼續(xù)執(zhí)行代碼:

BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

在高級(jí)別REST客戶端中解析REST響應(yīng)失敗、請(qǐng)求超時(shí)或類似的情況,其中沒(méi)有來(lái)自服務(wù)器的響應(yīng)的情況下,同步調(diào)用可能引發(fā)IOException。

在服務(wù)器返回4xx5xx錯(cuò)誤代碼的情況下,高級(jí)別客戶端嘗試解析響應(yīng)體錯(cuò)誤細(xì)節(jié),然后拋出一個(gè)通用的ElasticsearchException并將原始的ResponseException作為一個(gè)被抑制的異常添加到它。

異步執(zhí)行

還可以以異步方式執(zhí)行BulkRequest,以便客戶端可以直接返回,用戶需要指定如何通過(guò)將請(qǐng)求和偵聽器傳遞給異步塊方法來(lái)處理響應(yīng)或潛在故障:

client.bulkAsync(request, RequestOptions.DEFAULT, listener);

要執(zhí)行的BulkRequest和在執(zhí)行完成時(shí)使用的ActionListener。

異步方法不會(huì)阻塞并立即返回,一旦執(zhí)行完成,ActionListener將使用onResponse方法(如果執(zhí)行成功)被調(diào)用,或者使用onFailure方法(如果執(zhí)行失?。┍徽{(diào)用,失敗情況和預(yù)期的異常與同步執(zhí)行情況相同。

一個(gè)典型的bulk監(jiān)聽器是這樣的:

ActionListener listener = new ActionListener() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

onResponse當(dāng)執(zhí)行成功完成時(shí)調(diào)用。

onFailure當(dāng)整個(gè)BulkRequest失敗時(shí)調(diào)用。

Bulk響應(yīng)

返回的BulkResponse包含執(zhí)行操作的信息,允許對(duì)每個(gè)結(jié)果進(jìn)行如下迭代:

for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    switch (bulkItemResponse.getOpType()) {
    case INDEX:    
    case CREATE:
        IndexResponse indexResponse = (IndexResponse) itemResponse;
        break;
    case UPDATE:   
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
        break;
    case DELETE:   
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}

遍歷所有操作的結(jié)果。

檢索操作的響應(yīng)(成功與否),可以是IndexResponseUpdateResponseDeleteResponse,它們都可以看作DocWriteResponse實(shí)例。

處理索引操作的響應(yīng)。

處理更新操作的響應(yīng)。

處理刪除操作的響應(yīng)。

Bulk響應(yīng)提供了一種方法來(lái)快速檢查一個(gè)或多個(gè)操作是否失?。?/p>

if (bulkResponse.hasFailures()) { 

}

如果至少有一個(gè)操作失敗,此方法將返回true

在這種情況下,需要對(duì)所有的操作結(jié)果進(jìn)行迭代,以檢查操作是否失敗,如果失敗,則檢索相應(yīng)的失?。?/p>

for (BulkItemResponse bulkItemResponse : bulkResponse) {
    if (bulkItemResponse.isFailed()) { 
        BulkItemResponse.Failure failure =
                bulkItemResponse.getFailure(); 
    }
}

指示給定操作是否失敗。

檢索失敗操作的失敗。

Bulk處理器

BulkProcessor提供了一個(gè)實(shí)用程序類,允許索引/更新/刪除操作在添加到處理器時(shí)透明地執(zhí)行,從而簡(jiǎn)化了Bulk API的使用。

為了執(zhí)行請(qǐng)求,BulkProcessor需要以下組件:

RestHighLevelClient

此客戶端用于執(zhí)行BulkRequest并檢索BulkResponse。

BulkProcessor.Listener

在每次執(zhí)行BulkRequest之前和之后,或者當(dāng)一個(gè)BulkRequest失敗時(shí),都會(huì)調(diào)用這個(gè)偵聽器。

然后BulkProcessor.builder方法可以用來(lái)構(gòu)建一個(gè)新的BulkProcessor

BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        
    }
};

BulkProcessor bulkProcessor = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener).build();

創(chuàng)建BulkProcessor.Listener。

beforeBulk方法在每次執(zhí)行BulkRequest之前調(diào)用。

afterBulk方法在每次執(zhí)行BulkRequest之后調(diào)用。

failure參數(shù)的afterBulk方法在BulkRequest失敗時(shí)調(diào)用。

通過(guò)從BulkProcessor.builder調(diào)用build()方法創(chuàng)建BulkProcessorRestHighLevelClient.bulkAsync()方法將用于在后臺(tái)執(zhí)行BulkRequest。

BulkProcessor.Builder提供了一些方法來(lái)配置BulkProcessor應(yīng)該如何處理請(qǐng)求執(zhí)行:

BulkProcessor.Builder builder = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener);
builder.setBulkActions(500); 
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); 
builder.setConcurrentRequests(0); 
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); 
builder.setBackoffPolicy(BackoffPolicy
        .constantBackoff(TimeValue.timeValueSeconds(1L), 3));

根據(jù)當(dāng)前添加的操作數(shù)量設(shè)置刷新新bulk請(qǐng)求的時(shí)間(默認(rèn)為1000,使用-1禁用它)。

根據(jù)當(dāng)前添加的操作大小設(shè)置刷新新bulk請(qǐng)求的時(shí)間(默認(rèn)為5Mb,使用-1禁用)。

設(shè)置允許執(zhí)行的并發(fā)請(qǐng)求數(shù)量(默認(rèn)為1,使用0只允許執(zhí)行單個(gè)請(qǐng)求)。

設(shè)置刷新間隔,如果間隔通過(guò),則刷新任何掛起的BulkRequest(默認(rèn)為未設(shè)置)。

設(shè)置一個(gè)常量后退策略,該策略最初等待1秒并重試最多3次,有關(guān)更多選項(xiàng),請(qǐng)參見(jiàn)BackoffPolicy.noBackoff()、BackoffPolicy.constantBackoff()BackoffPolicy.exponentialBackoff()

一旦創(chuàng)建了BulkProcessor,就可以向它添加請(qǐng)求:

IndexRequest one = new IndexRequest("posts").id("1")
        .source(XContentType.JSON, "title",
                "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts").id("2")
        .source(XContentType.JSON, "title",
                "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts").id("3")
        .source(XContentType.JSON, "title",
                "The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

請(qǐng)求將由BulkProcessor執(zhí)行,它負(fù)責(zé)為每個(gè)bulk請(qǐng)求調(diào)用BulkProcessor.Listener。

監(jiān)聽器提供訪問(wèn)BulkRequestBulkResponse的方法:

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        int numberOfActions = request.numberOfActions(); 
        logger.debug("Executing bulk [{}] with {} requests",
                executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        if (response.hasFailures()) { 
            logger.warn("Bulk [{}] executed with failures", executionId);
        } else {
            logger.debug("Bulk [{}] completed in {} milliseconds",
                    executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        logger.error("Failed to execute bulk", failure); 
    }
};

beforeBulk在執(zhí)行BulkRequest的每次執(zhí)行之前調(diào)用,這個(gè)方法允許知道將要在BulkRequest中執(zhí)行的操作的數(shù)量。

afterBulk在每次執(zhí)行BulkRequest之后調(diào)用,這個(gè)方法允許知道BulkResponse是否包含錯(cuò)誤。

如果BulkRequest失敗,則調(diào)用帶failure參數(shù)的afterBulk方法,該方法允許知道失敗。

將所有請(qǐng)求添加到BulkProcessor之后,需要使用兩種可用的關(guān)閉方法之一關(guān)閉它的實(shí)例。

awaitClose()方法可以用來(lái)等待,直到所有的請(qǐng)求都被處理完畢或者指定的等待時(shí)間過(guò)去:

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

如果所有bulk請(qǐng)求都已完成,則該方法返回true,如果在所有bulk請(qǐng)求完成之前的等待時(shí)間已經(jīng)過(guò)去,則返回false。

close()方法可用于立即關(guān)閉BulkProcessor

這兩種方法都在關(guān)閉處理器之前刷新添加到處理器的請(qǐng)求,并且禁止向處理器添加任何新請(qǐng)求。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/75436.html

相關(guān)文章

  • Elasticsearch Java REST Client(目錄)

    摘要:用于的官方高級(jí)別客戶端,基于低級(jí)別客戶端,它公開特定的方法,并負(fù)責(zé)請(qǐng)求編組和響應(yīng)反編組。入門初始化執(zhí)行請(qǐng)求讀取響應(yīng)日志記錄通用配置嗅探器在中被添加。依賴于核心項(xiàng)目,它接受與相同的請(qǐng)求參數(shù),并返回相同的響應(yīng)對(duì)象。 Elasticsearch Java REST Client Java REST Client有兩種類型: Java Low Level REST Client:用于Elast...

    roland_reed 評(píng)論0 收藏0
  • Elasticsearch Java High Level REST Client(入門)

    摘要:入門本節(jié)描述從獲取工件到在應(yīng)用程序中使用它如何開始使用高級(jí)別客戶端。保證能夠與運(yùn)行在相同主版本和大于或等于的次要版本上的任何節(jié)點(diǎn)通信。與具有相同的發(fā)布周期,將版本替換為想要的客戶端版本。 Java High Level REST Client 入門 本節(jié)描述從獲取工件到在應(yīng)用程序中使用它如何開始使用高級(jí)別REST客戶端。 兼容性 Java High Level REST Client需...

    honmaple 評(píng)論0 收藏0
  • Elasticsearch Java High Level REST Client(Exists A

    摘要:如果文檔存在,則返回,否則返回。禁用提取存儲(chǔ)的字段。異步方法不會(huì)阻塞并立即返回,完成后,如果執(zhí)行成功完成,則使用方法回調(diào),如果失敗則使用方法。的典型偵聽器如下所示執(zhí)行成功完成時(shí)調(diào)用。 Exists API 如果文檔存在,則existsAPI返回true,否則返回false。 Exists請(qǐng)求 它就像Get API一樣使用GetRequest,支持所有可選參數(shù),由于exists()只返回...

    Thanatos 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<