Elasticsearch Java Rest Client API 整理总结 (一) (4)

如果在 UpdateRequest 中使能了获取源数据,响应中则包含了更新后的源文档信息。

GetResult result = updateResponse.getGetResult(); if (result.isExists()) { String sourceAsString = result.sourceAsString(); // 将获取的文档以 string 格式输出 Map<String, Object> sourceAsMap = result.sourceAsMap(); // 以 Map 格式输出 byte[] sourceAsBytes = result.source(); // 字节形式 } else { // 默认情况下,不会返回文档源数据 }

也可以检测是否分片失败

ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { // 成功的分片数量小于总分片数量 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); // 得到分片失败的原因 } }

如果在执行 UpdateRequest 时,文档不存在,响应中会包含 404 状态码,而且会抛出 ElasticsearchException 。

UpdateRequest request = new UpdateRequest("posts", "type", "does_not_exist") .doc("field", "value"); try { UpdateResponse updateResponse = client.update(request); } catch (ElasticsearchException e) { if (e.status() == RestStatus.NOT_FOUND) { // 处理文档不存在的情况 } }

如果版本冲突,也会抛出 ElasticsearchException

UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("field", "value") .version(1); try { UpdateResponse updateResponse = client.update(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { // 处理版本冲突的情况 } } Bulk API 批量处理 批量请求

使用 BulkRequest 可以在一次请求中执行多个索引,更新和删除的操作。

BulkRequest request = new BulkRequest(); request.add(new IndexRequest("posts", "doc", "1") .source(XContentType.JSON,"field", "foo")); // 将第一个 IndexRequest 添加到批量请求中 request.add(new IndexRequest("posts", "doc", "2") .source(XContentType.JSON,"field", "bar")); // 第二个 request.add(new IndexRequest("posts", "doc", "3") .source(XContentType.JSON,"field", "baz")); // 第三个

在同一个 BulkRequest 也可以添加不同的操作类型

BulkRequest request = new BulkRequest(); request.add(new DeleteRequest("posts", "doc", "3")); request.add(new UpdateRequest("posts", "doc", "2") .doc(XContentType.JSON,"other", "test")); request.add(new IndexRequest("posts", "doc", "4") .source(XContentType.JSON,"field", "baz")); 可选参数

超时时间

request.timeout(TimeValue.timeValueMinutes(2)); request.timeout("2m");

刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");

设置在批量操作前必须有几个分片处于激活状态

request.waitForActiveShards(2); request.waitForActiveShards(ActiveShardCount.ALL); // 全部分片都处于激活状态 request.waitForActiveShards(ActiveShardCount.DEFAULT); // 默认 request.waitForActiveShards(ActiveShardCount.ONE); // 一个 同步请求 BulkResponse bulkResponse = client.bulk(request); 异步请求

与 GETAPI 等请求类似,只贴代码。

ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { } @Override public void onFailure(Exception e) { } }; client.bulkAsync(request, listener); Bulk Response

BulkResponse 中包含执行操作后的信息,并允许对每个操作结果迭代。

for (BulkItemResponse bulkItemResponse : bulkResponse) { // 遍历所有的操作结果 DocWriteResponse itemResponse = bulkItemResponse.getResponse(); // 获取操作结果的响应,可以是 IndexResponse, UpdateResponse or DeleteResponse, 它们都可以惭怍是 DocWriteResponse 实例 if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; // index 操作后的响应结果 } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; // update 操作后的响应结果 } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; // delete 操作后的响应结果 } }

此外,批量响应还有一个非常便捷的方法来检测是否有一个或多个操作失败

if (bulkResponse.hasFailures()) { // 表示至少有一个操作失败 }

在这种情况下,我们要遍历所有的操作结果,检查是否是失败的操作,并获取对应的失败信息

for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { // 检测给定的操作是否失败 BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); // 获取失败信息 } } Bulk Processor

BulkProcessor 是为了简化 Bulk API 的操作提供的一个工具类,要执行操作,就需要下面组件

RestHighLevelClient 用来执行 BulkRequest 并获取 BulkResponse`

BulkProcessor.Listener 对 BulkRequest 执行前后以及失败时监听

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwpsx.html