获得的 QueryAction 我们可以通过 QueryActionElasticExecutor类的executeAnyAction方法来接受,并内部处理,然后就能获得相应的执行结果。
public static Object executeAnyAction(Client client , QueryAction queryAction) throws SqlParseException, IOException { if(queryAction instanceof DefaultQueryAction) return executeSearchAction((DefaultQueryAction) queryAction); if(queryAction instanceof AggregationQueryAction) return executeAggregationAction((AggregationQueryAction) queryAction); if(queryAction instanceof ESJoinQueryAction) return executeJoinSearchAction(client, (ESJoinQueryAction) queryAction); if(queryAction instanceof MultiQueryAction) return executeMultiQueryAction(client, (MultiQueryAction) queryAction); if(queryAction instanceof DeleteQueryAction ) return executeDeleteAction((DeleteQueryAction) queryAction); return null; }虽然得到了查询结果,但是它是一个Object类型,我们还需要定制化一下,注意到了一个类:ObjectResultsExtractor,它的构造函数如下,构造函数包含三个布尔类型的参数。它们的作用是在结果集中是否包含score,是否包含type,是否包含ID,我们可以都设置为 false。
public ObjectResultsExtractor(boolean includeScore, boolean includeType, boolean includeId) { this.includeScore = includeScore; this.includeType = includeType; this.includeId = includeId; this.currentLineIndex = 0; }ObjectResultsExtractor它仅有一个对外的 pulic 修饰的方法extractResults。
public ObjectResult extractResults(Object queryResult, boolean flat) throws ObjectResultsExtractException { if (queryResult instanceof SearchHits) { SearchHit[] hits = ((SearchHits) queryResult).getHits(); List<Map<String, Object>> docsAsMap = new ArrayList<>(); List<String> headers = createHeadersAndFillDocsMap(flat, hits, docsAsMap); List<List<Object>> lines = createLinesFromDocs(flat, docsAsMap, headers); return new ObjectResult(headers, lines); } if (queryResult instanceof Aggregations) { List<String> headers = new ArrayList<>(); List<List<Object>> lines = new ArrayList<>(); lines.add(new ArrayList<Object>()); handleAggregations((Aggregations) queryResult, headers, lines); // remove empty line。 if(lines.get(0).size() == 0) { lines.remove(0); } //todo: need to handle more options for aggregations: //Aggregations that inhrit from base //ScriptedMetric return new ObjectResult(headers, lines); } return null; }至此我们就大致了解了它的查询API ,然后我们只需要在我们项目中做如下的代码调用就可以完成我们的查询功能了,最后得到的ObjectResult就是我们的最终查询结果集了。
//1.解释SQL SearchDao searchDao = new SearchDao(transportClient); QueryAction queryAction = searchDao.explain(sql); //2.执行 Object execution = QueryActionElasticExecutor.executeAnyAction(searchDao.getClient(), queryAction); //3.格式化查询结果 ObjectResult result = (new ObjectResultsExtractor(true, false, false)).extractResults(execution, true);至此,代码开发完成,我们来测试下运行结果,我对外提供了三个接口,一个是 API方式查询,一个是JDBC方式查询,还有一个解释SQL。
@RestController @RequestMapping("/es/data") public class ElasticSearchController { @Autowired private ElasticSearchSqlService elasticSearchSqlService; @PostMapping(value = "/search") public CommonResult search(@RequestBody QueryDto queryDto) { SearchResultDTO resultDTO = elasticSearchSqlService.search(queryDto.getSql()); return CommonResult.success(resultDTO.getResult()); } @PostMapping(value = "/query") public CommonResult query(@RequestBody QueryDto queryDto) { SearchResultDTO resultDTO = elasticSearchSqlService.query(queryDto.getSql(), queryDto.getIndex()); return CommonResult.success(resultDTO.getResult()); } @PostMapping(value = "/explain") public CommonResult explain(@RequestBody QueryDto queryDto) { return CommonResult.success(elasticSearchSqlService.explain(queryDto.getSql())); } }请求示例:
查询结果示例:
总结SQL 虽然不是 ES 官方推荐的查询语言,但是由于他的便捷性,ES 官方也开始意识到这块。ES 在 6.3.0版本后也开始支持 SQL了,但是他是通过引入 x-pack 的方式,如归我们可以通过 REST 方式使用,但是我们引入到开发中还是有点问题,需要铂金会员才行,不知道以后会不会放开。
另外,SQL 虽然使用起来比较方便,但是毕竟不是官方指定的,所以难免在功能上有缺陷,没有 DSL 功能强大,而且里面的坑比较多,但是基本的查询都支持。所以如果不是迫不得已,我还是建议使用 DSL,而一些简单的操作可以用SQL来辅助,本篇文章源码都已上传到本人的 Github ,如果感兴趣的读者可以关注我的 Github。