摘要:異步執(zhí)行還可以以異步方式執(zhí)行,以便客戶端可以直接返回,用戶需要指定如何通過(guò)將請(qǐng)求和偵聽器傳遞給異步塊方法來(lái)處理響應(yīng)或潛在故障要執(zhí)行的和在執(zhí)行完成時(shí)使用的。在每次執(zhí)行之前和之后,或者當(dāng)一個(gè)失敗時(shí),都會(huì)調(diào)用這個(gè)偵聽器。
Bulk API
Java High Level REST Client提供了Bulk處理器來(lái)幫助處理批量請(qǐng)求。Bulk請(qǐng)求
BulkRequest可以使用一個(gè)請(qǐng)求執(zhí)行多個(gè)索引、更新和/或刪除操作。
它需要在批量請(qǐng)求中添加至少一個(gè)操作:
BulkRequest request = new BulkRequest(); request.add(new IndexRequest("posts").id("1") .source(XContentType.JSON,"field", "foo")); request.add(new IndexRequest("posts").id("2") .source(XContentType.JSON,"field", "bar")); request.add(new IndexRequest("posts").id("3") .source(XContentType.JSON,"field", "baz"));
創(chuàng)建BulkRequest。
將IndexRequest添加到Bulk請(qǐng)求。
Bulk API只支持JSON或SMILE編碼的文檔,提供任何其他格式的文檔都會(huì)導(dǎo)致錯(cuò)誤。
不同的操作類型可以添加到同一個(gè)BulkRequest:
BulkRequest request = new BulkRequest(); request.add(new DeleteRequest("posts", "3")); request.add(new UpdateRequest("posts", "2") .doc(XContentType.JSON,"other", "test")); request.add(new IndexRequest("posts").id("4") .source(XContentType.JSON,"field", "baz"));
向BulkRequest添加DeleteRequest。
向BulkRequest添加UpdateRequest。
使用JSON格式添加IndexRequest。
可選參數(shù)可以選擇提供以下參數(shù):
request.timeout(TimeValue.timeValueMinutes(2)); request.timeout("2m");
作為TimeValue等待bulk請(qǐng)求執(zhí)行的超時(shí)。
作為String等待bulk請(qǐng)求執(zhí)行的超時(shí)。
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
作為WriteRequest.RefreshPolicy實(shí)例的刷新策略。
作為String的刷新策略。
request.waitForActiveShards(2); request.waitForActiveShards(ActiveShardCount.ALL);
設(shè)置在繼續(xù)執(zhí)行索引/更新/刪除操作之前必須活動(dòng)的碎片副本的數(shù)量。
作為ActiveShardCount提供的碎片副本的數(shù)量:可以是ActiveShardCount.ALL、ActiveShardCount.ONE、ActiveShardCount.DEFAULT(默認(rèn))。
request.pipeline("pipelineId");
全局pipelineId用于所有子請(qǐng)求,除非在子請(qǐng)求上重寫。
request.routing("routingId");
全局routingId用于所有子請(qǐng)求,除非在子請(qǐng)求上重寫。
BulkRequest defaulted = new BulkRequest("posts");
在所有子請(qǐng)求上使用全局索引的bulk請(qǐng)求,除非在子請(qǐng)求上重寫,這個(gè)參數(shù)是@Nullable,并只能在創(chuàng)建BulkRequest時(shí)設(shè)置。
同步執(zhí)行當(dāng)以以下方式執(zhí)行BulkRequest時(shí),客戶端等待BulkResponse返回,然后繼續(xù)執(zhí)行代碼:
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
在高級(jí)別REST客戶端中解析REST響應(yīng)失敗、請(qǐng)求超時(shí)或類似的情況,其中沒(méi)有來(lái)自服務(wù)器的響應(yīng)的情況下,同步調(diào)用可能引發(fā)IOException。
在服務(wù)器返回4xx或5xx錯(cuò)誤代碼的情況下,高級(jí)別客戶端嘗試解析響應(yīng)體錯(cuò)誤細(xì)節(jié),然后拋出一個(gè)通用的ElasticsearchException并將原始的ResponseException作為一個(gè)被抑制的異常添加到它。
異步執(zhí)行還可以以異步方式執(zhí)行BulkRequest,以便客戶端可以直接返回,用戶需要指定如何通過(guò)將請(qǐng)求和偵聽器傳遞給異步塊方法來(lái)處理響應(yīng)或潛在故障:
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
要執(zhí)行的BulkRequest和在執(zhí)行完成時(shí)使用的ActionListener。
異步方法不會(huì)阻塞并立即返回,一旦執(zhí)行完成,ActionListener將使用onResponse方法(如果執(zhí)行成功)被調(diào)用,或者使用onFailure方法(如果執(zhí)行失?。┍徽{(diào)用,失敗情況和預(yù)期的異常與同步執(zhí)行情況相同。
一個(gè)典型的bulk監(jiān)聽器是這樣的:
ActionListenerlistener = new ActionListener () { @Override public void onResponse(BulkResponse bulkResponse) { } @Override public void onFailure(Exception e) { } };
onResponse當(dāng)執(zhí)行成功完成時(shí)調(diào)用。
onFailure當(dāng)整個(gè)BulkRequest失敗時(shí)調(diào)用。
Bulk響應(yīng)返回的BulkResponse包含執(zhí)行操作的信息,允許對(duì)每個(gè)結(jié)果進(jìn)行如下迭代:
for (BulkItemResponse bulkItemResponse : bulkResponse) { DocWriteResponse itemResponse = bulkItemResponse.getResponse(); switch (bulkItemResponse.getOpType()) { case INDEX: case CREATE: IndexResponse indexResponse = (IndexResponse) itemResponse; break; case UPDATE: UpdateResponse updateResponse = (UpdateResponse) itemResponse; break; case DELETE: DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } }
遍歷所有操作的結(jié)果。
檢索操作的響應(yīng)(成功與否),可以是IndexResponse、UpdateResponse或DeleteResponse,它們都可以看作DocWriteResponse實(shí)例。
處理索引操作的響應(yīng)。
處理更新操作的響應(yīng)。
處理刪除操作的響應(yīng)。
Bulk響應(yīng)提供了一種方法來(lái)快速檢查一個(gè)或多個(gè)操作是否失?。?/p>
if (bulkResponse.hasFailures()) { }
如果至少有一個(gè)操作失敗,此方法將返回true。
在這種情況下,需要對(duì)所有的操作結(jié)果進(jìn)行迭代,以檢查操作是否失敗,如果失敗,則檢索相應(yīng)的失?。?/p>
for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); } }
指示給定操作是否失敗。
檢索失敗操作的失敗。
Bulk處理器BulkProcessor提供了一個(gè)實(shí)用程序類,允許索引/更新/刪除操作在添加到處理器時(shí)透明地執(zhí)行,從而簡(jiǎn)化了Bulk API的使用。
為了執(zhí)行請(qǐng)求,BulkProcessor需要以下組件:
RestHighLevelClient
此客戶端用于執(zhí)行BulkRequest并檢索BulkResponse。
BulkProcessor.Listener
在每次執(zhí)行BulkRequest之前和之后,或者當(dāng)一個(gè)BulkRequest失敗時(shí),都會(huì)調(diào)用這個(gè)偵聽器。
然后BulkProcessor.builder方法可以用來(lái)構(gòu)建一個(gè)新的BulkProcessor:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { } }; BulkProcessor bulkProcessor = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener).build();
創(chuàng)建BulkProcessor.Listener。
beforeBulk方法在每次執(zhí)行BulkRequest之前調(diào)用。
afterBulk方法在每次執(zhí)行BulkRequest之后調(diào)用。
帶failure參數(shù)的afterBulk方法在BulkRequest失敗時(shí)調(diào)用。
通過(guò)從BulkProcessor.builder調(diào)用build()方法創(chuàng)建BulkProcessor,RestHighLevelClient.bulkAsync()方法將用于在后臺(tái)執(zhí)行BulkRequest。
BulkProcessor.Builder提供了一些方法來(lái)配置BulkProcessor應(yīng)該如何處理請(qǐng)求執(zhí)行:
BulkProcessor.Builder builder = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener); builder.setBulkActions(500); builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); builder.setConcurrentRequests(0); builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); builder.setBackoffPolicy(BackoffPolicy .constantBackoff(TimeValue.timeValueSeconds(1L), 3));
根據(jù)當(dāng)前添加的操作數(shù)量設(shè)置刷新新bulk請(qǐng)求的時(shí)間(默認(rèn)為1000,使用-1禁用它)。
根據(jù)當(dāng)前添加的操作大小設(shè)置刷新新bulk請(qǐng)求的時(shí)間(默認(rèn)為5Mb,使用-1禁用)。
設(shè)置允許執(zhí)行的并發(fā)請(qǐng)求數(shù)量(默認(rèn)為1,使用0只允許執(zhí)行單個(gè)請(qǐng)求)。
設(shè)置刷新間隔,如果間隔通過(guò),則刷新任何掛起的BulkRequest(默認(rèn)為未設(shè)置)。
設(shè)置一個(gè)常量后退策略,該策略最初等待1秒并重試最多3次,有關(guān)更多選項(xiàng),請(qǐng)參見(jiàn)BackoffPolicy.noBackoff()、BackoffPolicy.constantBackoff()和BackoffPolicy.exponentialBackoff()。
一旦創(chuàng)建了BulkProcessor,就可以向它添加請(qǐng)求:
IndexRequest one = new IndexRequest("posts").id("1") .source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); IndexRequest two = new IndexRequest("posts").id("2") .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); IndexRequest three = new IndexRequest("posts").id("3") .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); bulkProcessor.add(one); bulkProcessor.add(two); bulkProcessor.add(three);
請(qǐng)求將由BulkProcessor執(zhí)行,它負(fù)責(zé)為每個(gè)bulk請(qǐng)求調(diào)用BulkProcessor.Listener。
監(jiān)聽器提供訪問(wèn)BulkRequest和BulkResponse的方法:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { int numberOfActions = request.numberOfActions(); logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { logger.warn("Bulk [{}] executed with failures", executionId); } else { logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("Failed to execute bulk", failure); } };
beforeBulk在執(zhí)行BulkRequest的每次執(zhí)行之前調(diào)用,這個(gè)方法允許知道將要在BulkRequest中執(zhí)行的操作的數(shù)量。
afterBulk在每次執(zhí)行BulkRequest之后調(diào)用,這個(gè)方法允許知道BulkResponse是否包含錯(cuò)誤。
如果BulkRequest失敗,則調(diào)用帶failure參數(shù)的afterBulk方法,該方法允許知道失敗。
將所有請(qǐng)求添加到BulkProcessor之后,需要使用兩種可用的關(guān)閉方法之一關(guān)閉它的實(shí)例。
awaitClose()方法可以用來(lái)等待,直到所有的請(qǐng)求都被處理完畢或者指定的等待時(shí)間過(guò)去:
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
如果所有bulk請(qǐng)求都已完成,則該方法返回true,如果在所有bulk請(qǐng)求完成之前的等待時(shí)間已經(jīng)過(guò)去,則返回false。
close()方法可用于立即關(guān)閉BulkProcessor:
這兩種方法都在關(guān)閉處理器之前刷新添加到處理器的請(qǐng)求,并且禁止向處理器添加任何新請(qǐng)求。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/75436.html
摘要:用于的官方高級(jí)別客戶端,基于低級(jí)別客戶端,它公開特定的方法,并負(fù)責(zé)請(qǐng)求編組和響應(yīng)反編組。入門初始化執(zhí)行請(qǐng)求讀取響應(yīng)日志記錄通用配置嗅探器在中被添加。依賴于核心項(xiàng)目,它接受與相同的請(qǐng)求參數(shù),并返回相同的響應(yīng)對(duì)象。 Elasticsearch Java REST Client Java REST Client有兩種類型: Java Low Level REST Client:用于Elast...
摘要:入門本節(jié)描述從獲取工件到在應(yīng)用程序中使用它如何開始使用高級(jí)別客戶端。保證能夠與運(yùn)行在相同主版本和大于或等于的次要版本上的任何節(jié)點(diǎn)通信。與具有相同的發(fā)布周期,將版本替換為想要的客戶端版本。 Java High Level REST Client 入門 本節(jié)描述從獲取工件到在應(yīng)用程序中使用它如何開始使用高級(jí)別REST客戶端。 兼容性 Java High Level REST Client需...
摘要:如果文檔存在,則返回,否則返回。禁用提取存儲(chǔ)的字段。異步方法不會(huì)阻塞并立即返回,完成后,如果執(zhí)行成功完成,則使用方法回調(diào),如果失敗則使用方法。的典型偵聽器如下所示執(zhí)行成功完成時(shí)調(diào)用。 Exists API 如果文檔存在,則existsAPI返回true,否則返回false。 Exists請(qǐng)求 它就像Get API一樣使用GetRequest,支持所有可選參數(shù),由于exists()只返回...
閱讀 3124·2021-11-24 09:39
閱讀 986·2021-09-07 10:20
閱讀 2406·2021-08-23 09:45
閱讀 2282·2021-08-05 10:00
閱讀 582·2019-08-29 16:36
閱讀 846·2019-08-29 11:12
閱讀 2831·2019-08-26 11:34
閱讀 1848·2019-08-26 10:56