Elasticsearch Java Rest Client API 整理总结 (一) (5)

BulkProcessor.builder 方法用来构建一个新的BulkProcessor

BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { // 在每个 BulkRequest 执行前调用 } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { // 在每个 BulkRequest 执行后调用 } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // 失败时调用 } }; BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync, listener).build(); // 构建 BulkProcessor, RestHighLevelClient.bulkAsync() 用来执行 BulkRequest

BulkProcessor.Builder 提供了多个方法来配置 BulkProcessor
如何来处理请求的执行。

BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener); builder.setBulkActions(500); // 指定多少操作时,就会刷新一次 builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); builder.setConcurrentRequests(0); // 指定多大容量,就会刷新一次 builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // 允许并发执行的数量 builder.setBackoffPolicy(BackoffPolicy .constantBackoff(TimeValue.timeValueSeconds(1L), 3));

BulkProcessor 创建后,各种请求就可以添加进去:

IndexRequest one = new IndexRequest("posts", "doc", "1"). source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); IndexRequest two = new IndexRequest("posts", "doc", "2") .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); IndexRequest three = new IndexRequest("posts", "doc", "3") .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); bulkProcessor.add(one); bulkProcessor.add(two); bulkProcessor.add(three);

BulkProcessor 执行时,会对每个 bulk request调用 BulkProcessor.Listener , listener 提供了下面方法来访问 BulkRequest 和 BulkResponse:

BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { int numberOfActions = request.numberOfActions(); // 在执行前获取操作的数量 logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { // 执行后查看响应中是否包含失败的操作 logger.warn("Bulk [{}] executed with failures", executionId); } else { logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("Failed to execute bulk", failure); // 请求失败时打印信息 } };

请求添加到 BulkProcessor , 它的实例可以使用下面两种方法关闭请求。

awaitClose() 在请求返回后或等待一定时间关闭

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

close() 立刻关闭

bulkProcessor.close();

两个方法都会在关闭前对处理器中的请求进行刷新,并避免新的请求添加进去。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwpsx.html