如果文章有错的地方欢迎指正,大家互相交流。习惯在微信看技术文章,想要获取更多的Java资源的同学,可以关注微信公众号:Java3y
四、REST API在前面一章中已经下载了Elasticsearch和它常用的各种插件了。了解相关的概念后,我们就应该动手弄一弄Elasticsearch是怎么用的。
参考链接:
我是跟着这个教程大致把REST API走了一下,也渐渐了解了Elasticsearch的知识点了。
五、JAVA API我们毕竟是使用Java的,因此得知道Elasticsearch和Java是怎么连接的。我找了几篇教程:
后来找到了一篇比较系统的教程,推荐这个:
自己也跟着教程写了Demo,其中也出了不少错误。贴上代码:
import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.exists.types.TypesExistsResponse; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.deletebyquery.DeleteByQueryAction; import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.shield.ShieldPlugin; import org.junit.Test; import java.io.*; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; import java.util.concurrent.ExecutionException; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import org.elasticsearch.client.transport.TransportClient; /** * Created by ozc on 2017/11/5. */ public class ElasticsearchDemo { /** * 创建 * @throws UnknownHostException */ @Test public void CreateIndex() throws UnknownHostException { //连接客户端 Client client = TransportClient.builder().build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); //将数据转成json字符串,用集合装载起来 List<String> jsonData = DataFactory.getInitJsonData(); //创建blog索引、article类型、数据是上边的json字符串 for (int i = 0; i < jsonData.size(); i++) { IndexResponse response = client.prepareIndex("blog", "article","1").setSource(jsonData.get(i)).get(); if (response.isCreated()) { System.out.println("创建成功!"); } } client.close(); } /** * 查询 * @throws UnknownHostException */ @Test public void selectIndex() throws UnknownHostException { Settings settings = Settings.settingsBuilder() .put("cluster.name", "elasticsearch") .put("shield.user", "zhongfucheng:zhongfucheng") .build(); TransportClient client = TransportClient.builder() .addPlugin(ShieldPlugin.class) .settings(settings).build(); client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); //查询title字段中包含hibernate关键字的文档 QueryBuilder qb1 = termQuery("title", "hibernate"); //查询title字段或content字段中包含git关键字的文档: QueryBuilder qb2= QueryBuilders.multiMatchQuery("git", "title","content"); SearchResponse response = client.prepareSearch("blog").setTypes("article").setQuery(qb2).execute() .actionGet(); SearchHits hits = response.getHits(); if (hits.totalHits() > 0) { for (SearchHit hit : hits) { System.out.println("score:"+hit.getScore()+":\t"+hit.getSource());// .get("title") } } else { System.out.println("搜到0条结果"); } } /** * 使用updateRequest更新 * @throws IOException * @throws ExecutionException * @throws InterruptedException */ @Test public void update1() throws IOException, ExecutionException, InterruptedException { //连接客户端 Client client = TransportClient.builder().build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); UpdateRequest uRequest = new UpdateRequest(); uRequest.index("blog"); uRequest.type("article"); uRequest.id("1"); uRequest.doc(jsonBuilder().startObject().field("content", "学习目标 掌握java泛型的产生意义ssss").endObject()); client.update(uRequest).get(); } /** * 使用脚本更新,需要更改配置文件【我不喜欢用】 * @throws IOException * @throws ExecutionException * @throws InterruptedException */ @Test public void update2() throws IOException, ExecutionException, InterruptedException { //连接客户端 Client client = TransportClient.builder().build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); client.prepareUpdate("blog", "article", "1") .setScript(new Script("ctx._source.title = \"git入门\"", ScriptService.ScriptType.INLINE, null, null)) .get(); } /** * 使用doc更新 * @throws IOException * @throws ExecutionException * @throws InterruptedException */ @Test public void update3() throws IOException, ExecutionException, InterruptedException { //连接客户端 Client client = TransportClient.builder().build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); client.prepareUpdate("blog", "article", "1") .setDoc(jsonBuilder().startObject().field("content", "SVN与Git对比222222。。。").endObject()).get(); } /** * 使用updateRequest更新、能够新增新字段 * @throws IOException * @throws ExecutionException * @throws InterruptedException */ @Test public void update4() throws IOException, ExecutionException, InterruptedException { //连接客户端 Client client = TransportClient.builder().build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); UpdateRequest updateRequest = new UpdateRequest("blog", "article", "1") .doc(jsonBuilder().startObject().field("commet", "0").endObject()); client.update(updateRequest).get(); } /** * 使用UpdateRequest更新, 如果文档不存在则创建新的索引 * @throws IOException * @throws ExecutionException * @throws InterruptedException */ @Test public void update5() throws IOException, ExecutionException, InterruptedException { //连接客户端 Client client = TransportClient.builder().build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); IndexRequest indexRequest = new IndexRequest("blog", "article", "10").source(jsonBuilder().startObject() .field("title", "Git安装10").field("content", "学习目标 git。。。10").endObject()); UpdateRequest uRequest2 = new UpdateRequest("blog", "article", "10").doc( jsonBuilder().startObject().field("title", "Git安装").field("content", "学习目标 git。。。").endObject()) .upsert(indexRequest); client.update(uRequest2).get(); } /** * 删除具体的索引值 * @throws IOException * @throws ExecutionException * @throws InterruptedException */ @Test public void deleteSpecificIndex() throws IOException, ExecutionException, InterruptedException { //连接客户端 Client client = TransportClient.builder().build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); DeleteResponse dResponse = client.prepareDelete("blog", "article", "10").execute() .actionGet(); if (dResponse.isFound()) { System.out.println("删除成功"); } else { System.out.println("删除失败"); } } /** * 根据index名称 删除整个索引库 * @throws IOException * @throws ExecutionException * @throws InterruptedException */ @Test public void deleteIndex() throws IOException, ExecutionException, InterruptedException { //要删除索引库的名字 String indexName = "zhognfucheng"; if (!isIndexExists(indexName)) { System.out.println(indexName + " not exists"); } else { Client client = TransportClient.builder().build().addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(indexName) .execute().actionGet(); if (dResponse.isAcknowledged()) { System.out.println("delete index "+indexName+" successfully!"); }else{ System.out.println("Fail to delete index "+indexName); } } } // 创建索引库 @Test public void createIndex() { //要创建索引库的名称 String indexName = "shcool"; try { Client client = TransportClient.builder().build().addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); // 创建索引库 if (isIndexExists(indexName)) { System.out.println("Index " + indexName + " already exits!"); } else { CreateIndexRequest cIndexRequest = new CreateIndexRequest(indexName); CreateIndexResponse cIndexResponse = client.admin().indices().create(cIndexRequest) .actionGet(); if (cIndexResponse.isAcknowledged()) { System.out.println("create index successfully!"); } else { System.out.println("Fail to create index!"); } } } catch (UnknownHostException e) { e.printStackTrace(); } } /** * 批量从Elasticsearch导出json到文件中 */ @Test public void ElasticSearchBulkOut() { try { //初始化 Settings settings = Settings.settingsBuilder() .put("cluster.name", "elasticsearch").build();// cluster.name在elasticsearch.yml //连接客户端 Client client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300)); //匹配所有查询 QueryBuilder qb = QueryBuilders.matchAllQuery(); SearchResponse response = client.prepareSearch("blog") .setTypes("article").setQuery(qb) .execute().actionGet(); //获取命中记录 SearchHits resultHits = response.getHits(); //遍历命中记录,写到文件中 File article = new File("C:\\ElasticsearchDemo\\src\\java\\file\\bulk.txt"); FileWriter fw = new FileWriter(article); BufferedWriter bfw = new BufferedWriter(fw); if (resultHits.getHits().length == 0) { System.out.println("查到0条数据!"); } else { for (int i = 0; i < resultHits.getHits().length; i++) { String jsonStr = resultHits.getHits()[i] .getSourceAsString(); System.out.println(jsonStr); bfw.write(jsonStr); bfw.write("\n"); } } bfw.close(); fw.close(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * 批量导入到Elasticsearch中 */ @Test public void ElasticSearchBulkInput() { try { Settings settings = Settings.settingsBuilder() .put("cluster.name", "elasticsearch").build();// cluster.name在elasticsearch.yml中配置 Client client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300)); File article = new File("C:\\ElasticsearchDemo\\src\\java\\file\\bulk.txt"); FileReader fr=new FileReader(article); BufferedReader bfr=new BufferedReader(fr); String line = null; BulkRequestBuilder bulkRequest=client.prepareBulk(); int count=0; while((line=bfr.readLine())!=null){ bulkRequest.add(client.prepareIndex("test","article").setSource(line)); if (count%10==0) { bulkRequest.execute().actionGet(); } count++; //System.out.println(line); } bulkRequest.execute().actionGet(); bfr.close(); fr.close(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * 根据名称判断索引是否存在 * @param indexName * @return */ public boolean isIndexExists(String indexName) { boolean flag = false; try { Client client = TransportClient.builder().build().addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(indexName); //下面可判断多个索引名称是否存在 //IndicesExistsResponse indexResponse = client.admin().indices().prepareExists("blog","blog1").execute().actionGet(); IndicesExistsResponse inExistsResponse = client.admin().indices() .exists(inExistsRequest).actionGet(); if (inExistsResponse.isExists()) { flag = true; } else { flag = false; } } catch (UnknownHostException e) { e.printStackTrace(); } return flag; } /** * 删除type下的所有文档 * 也就是类似关系型数据库中根据表删除所有记录 * * ps(需要下载插件plugin install delete-by-query)和导入pom包 * * ps(使用的是windows服务的方式运行Elastic,因此需要在bin目录下的使用service重启,不然就会报空指针异常! */ @Test public void deleteByQuery() throws UnknownHostException { Client client = TransportClient.builder().build().addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); String deletebyquery = "{\"query\": {\"match_all\": {}}}"; DeleteByQueryRequestBuilder response = new DeleteByQueryRequestBuilder(client,DeleteByQueryAction.INSTANCE); response.setIndices("blog").setTypes("article").setSource(deletebyquery) .execute() .actionGet(); } //------------------------------------------------------------------- /** * Java三层嵌套查询,我感觉是用得很少的。贴个链接把: * * <p> * 搜索有相同父id的子文档,我也感觉用得少。贴个链接吧: * * * 集群配置,现在用不上。贴个链接吧: * */ //------------------------------------------------------------------- /** * 删除index为blog、类型为article、id为1的索引中的id属性 * (需要在配置文件中添加以下内容),否则会出现异常 * * script.inline: on script.indexed: on script.engine.groovy.inline.aggs: on * * @throws UnknownHostException */ @Test public void deleteField() throws UnknownHostException { Client client = TransportClient.builder().build().addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); client.prepareUpdate("blog", "article", "1").setScript(new Script("ctx._source.remove(\"title\")",ScriptService.ScriptType.INLINE, null, null)).get(); //删除属性中的属性 //client.prepareUpdate("test", "document", "1").setScript(new Script( "ctx._source.processInstance.remove(\"id\")",ScriptService.ScriptType.INLINE, null, null)).get(); } /** * 添加了Elasticsearch安全插件以后,连接cliet的方式就要改变了。 * * 网站给的maven坐标根本找不到,只能下载了个jar包用了。 * * 配置了Elasticsearch认证以后,kibana也需要配置,不然进不去的。 * * 链接: * * * 当前配置的Elasticsearch zhongfucheng:zhongfucheng * 当前配置的kibana kibanaserver:zhongfucheng (ps:此部分还需要修改配置文件,详情看上边的连接) * */ @Test public void changeClient() throws UnknownHostException { Settings settings = Settings.settingsBuilder() .put("cluster.name", "elasticsearch") .put("shield.user", "zhongfucheng:zhongfucheng") .build(); TransportClient client = TransportClient.builder() .addPlugin(ShieldPlugin.class) .settings(settings).build(); client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); System.out.println(client); } /** * 判断类型是否存在 * @throws UnknownHostException */ @Test public void isTypeExists() throws UnknownHostException { Settings settings = Settings.settingsBuilder() .put("cluster.name", "elasticsearch") .put("shield.user", "zhongfucheng:zhongfucheng") .build(); TransportClient client = TransportClient.builder() .addPlugin(ShieldPlugin.class) .settings(settings).build(); client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); // bolg是index、article是类型 TypesExistsResponse typeResponse = client.admin().indices() .prepareTypesExists("blog").setTypes("article") .execute().actionGet(); System.out.println(typeResponse.isExists()); } /** * 关闭索引 */ @Test public void closeIndex() throws UnknownHostException { Settings settings = Settings.settingsBuilder() .put("cluster.name", "elasticsearch") .put("shield.user", "zhongfucheng:zhongfucheng") .build(); TransportClient client = TransportClient.builder() .addPlugin(ShieldPlugin.class) .settings(settings).build(); client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); CloseIndexResponse cIndexResponse = client.admin().indices().prepareClose("shcool") .execute().actionGet(); if (cIndexResponse.isAcknowledged()) { System.out.println("关闭索引成功"); } } /** * 打开索引 * @throws UnknownHostException */ @Test public void openIndex() throws UnknownHostException { Settings settings = Settings.settingsBuilder() .put("cluster.name", "elasticsearch") .put("shield.user", "zhongfucheng:zhongfucheng") .build(); TransportClient client = TransportClient.builder() .addPlugin(ShieldPlugin.class) .settings(settings).build(); client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); OpenIndexResponse oIndexResponse = client.admin().indices() .prepareOpen("shcool") .execute().actionGet(); System.out.println(oIndexResponse.isAcknowledged()); } /** * 获取文档的集合 * 可以获取同一索引、同一类型、不同id的文档集合 * 也可以获取不同索引、不同类型、不同id的文档集合 */ @Test public void getDocCollection() throws UnknownHostException { Settings settings = Settings.settingsBuilder() .put("cluster.name", "elasticsearch") .put("shield.user", "zhongfucheng:zhongfucheng") .build(); TransportClient client = TransportClient.builder() .addPlugin(ShieldPlugin.class) .settings(settings).build(); client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("blog", "article", "1") //注释1 //.add("twitter", "tweet", "2", "3", "4") //注释2 .add("test", "article", "AV-K5x1NAQtVzQCf247Q") //注释3 .get(); for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //注释4 GetResponse response = itemResponse.getResponse(); if (response.isExists()) { //注释5 String json = response.getSourceAsString(); //注释6 System.out.println(json); } } /** 注释1: 通过单一的ID获取一个文档. 注释2:传入多个id,从相同的索引名/类型名中获取多个文档. 注释3:可以同时获取不同索引中的文档. 注释4:遍历结果集. 注释5:检验文档是否存在. 注释6:获取文档源. */ } /** * 获取索引下每个Type和Mapping * @throws IOException */ @Test public void getIndexTypeAndMapping() throws IOException { Settings settings = Settings.settingsBuilder() .put("cluster.name", "elasticsearch") .put("shield.user", "zhongfucheng:zhongfucheng") .build(); TransportClient client = TransportClient.builder() .addPlugin(ShieldPlugin.class) .settings(settings).build(); client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); ImmutableOpenMap<String, MappingMetaData> mappings = client.admin().cluster().prepareState().execute() .actionGet().getState().getMetaData().getIndices().get("blog").getMappings(); for (ObjectObjectCursor<String, MappingMetaData> cursor : mappings) { System.out.println(cursor.key); // 索引下的每个type System.out.println(cursor.value.getSourceAsMap()); // 每个type的mapping } } /** * * Elasticsearch还有分析聚合这么一个功能,类似与Mysql中的分组函数、统计数据。 * 贴个链接: * * */ }