Elasticsearch--客户端

墨蓝 2024-03-17 20:05 108阅读 0赞

Es客户端

语言无关

ac8909b1738a4ea1ad21a7f1058b538c.png

java最常用的客户端是Java Client、Java Rest Client、Java Transport Client

Java Client

从es7.17开始,官方推出Java Client,并且将Java Rest Client标为Deprecated(过期)

要求jdk至少要jdk8

具体用法再看===》

Java Rest Client

Java Rest Client分为:

Java Low level Rest Client

Java High level Rest Client

在es7.15的时候过期的

RestClient 是线程安全的,RestClient使用 Elasticsearch 的 HTTP 服务,默认为9200端口,这一点和transport client不同。

Java Low level Rest Client

之所以称为低级客户端,是因为它几乎没有帮助 Java 用户构建请求或解析响应。它处理请求的路径和查询字符串构造,但它将 JSON 请求和响应主体视为必须由用户处理的不透明字节数组。

特点
  • 与任何 Elasticsearch 版本兼容

    • ES 5.0.0只是发布第一个Java Low-level REST client时的ES版本(2016年),不代表其向前只兼容到5.0,Java Low-level REST client基于Apache HTTP 客户端,它允许使用 HTTP 与任何版本的 Elasticsearch 集群进行通信。
  • 最小化依赖
  • 跨所有可用节点的负载平衡
  • 在节点故障和特定响应代码的情况下进行故障转移
  • 连接失败惩罚(是否重试失败的节点取决于它连续失败的次数;失败的尝试越多,客户端在再次尝试同一节点之前等待的时间就越长)
  • 持久连接
  • 请求和响应的跟踪记录
  • 可选的集群节点自动发现(也称为嗅探)

#

Java High Level REST Client

Java 高级 REST 客户端在 Java 低级 REST 客户端之上运行。

它的主要目标是公开 API 特定的方法,接受请求对象作为参数并返回响应对象,以便请求编组和响应解组由客户端本身处理。

要求Elasticsearch版本为2.0或者更高。

maven

  1. <dependency>
  2. <groupId>org.elasticsearch.client</groupId>
  3. <artifactId>elasticsearch-rest-client</artifactId>
  4. <version>7.12.0</version>
  5. </dependency>

初始化

  1. // 初始化
  2. RestClient restClient = RestClient.builder(
  3. new HttpHost("localhost1", 9200, "http"),
  4. new HttpHost("localhost2", 9200, "http")).build();
  5. // 资源释放
  6. restClient.close();

简单用法

  1. @Test
  2. @SneakyThrows
  3. public void createIndex() {
  4. //region 创建客户端对象
  5. RestHighLevelClient client = new RestHighLevelClient(
  6. RestClient.builder(
  7. new HttpHost("localhost", 9200, "http")
  8. )
  9. );
  10. //endregion
  11. //region Request对象
  12. CreateIndexRequest request = new CreateIndexRequest("product2");
  13. //endregion
  14. //region 组装数据
  15. //region setting
  16. request.settings(Settings.builder()
  17. .put("index.number_of_shards", 3)
  18. .put("index.number_of_replicas", 0)
  19. );
  20. //endregion
  21. //region mapping
  22. // request.mapping(
  23. // "{\n" +
  24. // " \"properties\": {\n" +
  25. // " \"message\": {\n" +
  26. // " \"type\": \"text\"\n" +
  27. // " }\n" +
  28. // " }\n" +
  29. // "}",
  30. // XContentType.JSON);
  31. //region 还可以使用Map构建
  32. // Map<String, Object> message = new HashMap<>();
  33. // message.put("type", "text");
  34. // Map<String, Object> properties = new HashMap<>();
  35. // properties.put("message", message);
  36. // Map<String, Object> mapping = new HashMap<>();
  37. // mapping.put("properties", properties);
  38. // request.mapping(mapping);
  39. //endregion
  40. //region 使用XContentBuilder构建
  41. // XContentBuilder builder = XContentFactory.jsonBuilder();
  42. // builder.startObject();
  43. // {
  44. // builder.startObject("properties");
  45. // {
  46. // builder.startObject("message");
  47. // {
  48. // builder.field("type", "text");
  49. // }
  50. // builder.endObject();
  51. // }
  52. // builder.endObject();
  53. // }
  54. // builder.endObject();
  55. // request.mapping(builder);
  56. //endregion
  57. //endregion
  58. //region 别名
  59. request.alias(new Alias("product_alias").filter(QueryBuilders.termQuery("name", "xiaomi")));
  60. //endregion
  61. request.timeout(TimeValue.timeValueMillis(2));
  62. //endregion
  63. // 同步
  64. CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
  65. // 异步
  66. client.indices().createAsync(request, RequestOptions.DEFAULT, new ActionListener<CreateIndexResponse>() {
  67. @Override
  68. public void onResponse(CreateIndexResponse createIndexResponse) {
  69. }
  70. @Override
  71. public void onFailure(Exception e) {
  72. }
  73. });
  74. // 是否所有节点都已确认请求
  75. createIndexResponse.isAcknowledged();
  76. // 在超时之前是否为索引中的每个碎片启动所需数量的碎片副本
  77. createIndexResponse.isShardsAcknowledged();
  78. client.close();
  79. }
  80. @Test
  81. @SneakyThrows
  82. public void getIndex() {
  83. RestHighLevelClient client = new RestHighLevelClient(
  84. RestClient.builder(
  85. new HttpHost("localhost", 9200, "http")
  86. )
  87. );
  88. GetIndexRequest request = new GetIndexRequest("product*");
  89. GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
  90. String[] indices = response.getIndices();
  91. for (String indexName : indices) {
  92. System.out.println("index name:" + indexName);
  93. }
  94. client.close();
  95. }
  96. @Test
  97. @SneakyThrows
  98. public void delIndex() {
  99. RestHighLevelClient client = new RestHighLevelClient(
  100. RestClient.builder(
  101. new HttpHost("localhost", 9200, "http")
  102. )
  103. );
  104. DeleteIndexRequest request = new DeleteIndexRequest("product2");
  105. AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
  106. if (response.isAcknowledged()) {
  107. System.out.println("删除index成功!");
  108. } else {
  109. System.out.println("删除index失败!");
  110. }
  111. client.close();
  112. }
  113. @Test
  114. @SneakyThrows
  115. public void insertData() {
  116. //region 创建连接
  117. RestHighLevelClient client = new RestHighLevelClient(
  118. RestClient.builder(
  119. new HttpHost("localhost", 9200, "http")
  120. )
  121. );
  122. //endregion
  123. //region 准备数据
  124. List<Product> list = service.list();
  125. //endregion
  126. //region 创建Request对象
  127. //插入数据,index不存在则自动根据匹配到的template创建。index没必要每天创建一个,如果是为了灵活管理,最低建议每月一个 yyyyMM。
  128. IndexRequest request = new IndexRequest("test_index");
  129. //endregion
  130. //region 组装数据
  131. Product product = list.get(0);
  132. Gson gson = new Gson();
  133. //最好不要自定义id 会影响插入速度。
  134. request.id(product.getId().toString());
  135. request.source(gson.toJson(product)
  136. , XContentType.JSON);
  137. //endregion
  138. //region 执行Index操作
  139. IndexResponse response = client.index(request, RequestOptions.DEFAULT);
  140. //endregion
  141. System.out.println(response);
  142. client.close();
  143. }
  144. @Test
  145. @SneakyThrows
  146. public void batchInsertData() {
  147. //region 创建连接
  148. RestHighLevelClient client = new RestHighLevelClient(
  149. RestClient.builder(
  150. new HttpHost("localhost", 9200, "http")
  151. )
  152. );
  153. //endregion
  154. //region 创建Request对象
  155. //批量插入数据,更新和删除同理
  156. BulkRequest request = new BulkRequest("test_index");
  157. //endregion
  158. //region 组装数据
  159. Gson gson = new Gson();
  160. Product product = new Product();
  161. product.setPrice(3999.00);
  162. product.setDesc("xioami");
  163. for (int i = 0; i < 10; i++) {
  164. product.setName("name" + i);
  165. request.add(new IndexRequest()
  166. .id(Integer.toString(i))
  167. .source(gson.toJson(product)
  168. , XContentType.JSON)
  169. );
  170. }
  171. //endregion
  172. BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
  173. System.out.println("数量:" + response.getItems().length);
  174. client.close();
  175. }
  176. @Test
  177. @SneakyThrows
  178. public void getById() {
  179. //region 创建连接
  180. RestHighLevelClient client = new RestHighLevelClient(
  181. RestClient.builder(
  182. new HttpHost("localhost", 9200, "http")));
  183. //endregion
  184. //region 创建Request对象
  185. //注意 这里查询使用的是别名。
  186. GetRequest request = new GetRequest("test_index", "6");
  187. //endregion
  188. //region 组装数据
  189. String[] includes = {
  190. "name", "price"};
  191. String[] excludes = {
  192. "desc"};
  193. FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
  194. //只查询特定字段。如果需要查询所有字段则不设置该项。
  195. request.fetchSourceContext(fetchSourceContext);
  196. //endregion
  197. //region 响应数据
  198. GetResponse response = client.get(request, RequestOptions.DEFAULT);
  199. //endregion
  200. System.out.println(response);
  201. client.close();
  202. }
  203. @Test
  204. public void delById() throws IOException {
  205. //region Description
  206. RestHighLevelClient client = new RestHighLevelClient(
  207. RestClient.builder(
  208. new HttpHost("localhost", 9200, "http")
  209. )
  210. );
  211. //endregion
  212. DeleteRequest request = new DeleteRequest("test_index", "1");
  213. DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
  214. System.out.println(response);
  215. client.close();
  216. }
  217. @Test
  218. public void multiGetById() throws IOException {
  219. //region Description
  220. RestHighLevelClient client = new RestHighLevelClient(
  221. RestClient.builder(
  222. new HttpHost("localhost", 9200, "http")));
  223. //endregion
  224. //region Description
  225. //根据多个id查询
  226. MultiGetRequest request = new MultiGetRequest();
  227. //endregion
  228. //region Description
  229. request.add("test_index", "6");
  230. //两种写法
  231. request.add(new MultiGetRequest.Item(
  232. "test_index",
  233. "7"));
  234. //endregion
  235. //region Description
  236. MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
  237. //endregion
  238. for (MultiGetItemResponse itemResponse : response) {
  239. System.out.println(itemResponse.getResponse().getSourceAsString());
  240. }
  241. client.close();
  242. }
  243. @Test
  244. public void updateByQuery() throws IOException {
  245. //region 连接
  246. RestHighLevelClient client = new RestHighLevelClient(
  247. RestClient.builder(
  248. new HttpHost("localhost", 9200, "http")
  249. )
  250. );
  251. //endregion
  252. //region 请求对象
  253. UpdateByQueryRequest request = new UpdateByQueryRequest("test_index");
  254. //endregion
  255. //region 组装数据
  256. //默认情况下,版本冲突会中止 UpdateByQueryRequest 进程,但是你可以用以下命令来代替
  257. //设置版本冲突继续
  258. // request.setConflicts("proceed");
  259. //设置更新条件
  260. request.setQuery(QueryBuilders.termQuery("name", "name2"));
  261. // //限制更新条数
  262. // request.setMaxDocs(10);
  263. request.setScript(
  264. new Script(ScriptType.INLINE, "painless", "ctx._source.desc+='#';", Collections.emptyMap()));
  265. //endregion
  266. BulkByScrollResponse response = client.updateByQuery(request, RequestOptions.DEFAULT);
  267. System.out.println(response);
  268. client.close();
  269. }

优缺点

优点

安全:REST API使用单一的集群入口点,可以通过 HTTPS 保障数据安全性,传输层只用于内部节点到节点的通信。
易用:客户端只通过 REST 层而不是通过传输层调用服务,可以大大简化代码编写

缺点

性能略逊于Java API,但是差距不大

Low level Client

优点:
轻依赖:Apache HTTP 异步客户端及其传递依赖项(Apache HTTP 客户端、Apache HTTP Core、Apache HTTP Core NIO、Apache Commons Codec 和 Apache Commons Logging)
兼容性强:兼容所有ES版本
缺点:
功能少:显而易见,轻量化带来的必然后果

High level Client

优点:
功能强大:支持所有ES的API调用。
松耦合:客户端和ES核心服务完全独立,无共同依赖。
接口稳定:REST API 比与 Elasticsearch 版本完全匹配的`Transport Client`接口稳定得多。
缺点:
兼容性中等:基于Low Level Client,只向后兼容ES的大版本,比如6.0的客户端兼容6.x(即6.0之后的版本),但是6.1的客户端未必支持所有6.0ES的API,但是这并不是什么大问题,咱们使用相同版本的客户端和服务端即可,而且不会带来其他问题。

Java Transport Client

使用的客户端名称叫TransportClient

从7.0.0开始,官方已经不建议使用TransportClient作为ES的Java客户端了,并且从8.0会被彻底删除

TransportClient 使用transport模块(9300端口)远程连接到 Elasticsearch 集群,客户端并不加入集群,而是通过获取单个或者多个transport地址来以轮询的方式与他们通信。

TransportClient使用transport协议与Elasticsearch节点通信,如果客户端的版本和与其通信的ES实例的版本不同,就会出现兼容性问题。而low-level REST使用的是HTTP协议,可以与任意版本ES集群通信。high-level REST是基于low-level REST的。

es整合java时,es的版本和java中的版本要保证大版本一致,比如,7.x

es的版本和springboot版本兼容性关系

3e86859015da4c48a136e9e4f946e1e0.png

依赖

  1. <dependency>
  2. <groupId>org.elasticsearch.client</groupId>
  3. <artifactId>transport</artifactId>
  4. <version>7.12.1</version>
  5. </dependency>

连接

  1. // 创建客户端连接
  2. TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
  3. .addTransportAddress(new TransportAddress(InetAddress.getByName("host1"), 9300))
  4. .addTransportAddress(new TransportAddress(InetAddress.getByName("host2"), 9300));
  5. // 关闭客户端
  6. client.close();

简单使用

  1. @SneakyThrows
  2. private void create(TransportClient client) {
  3. List<Product> list = service.list();
  4. for (Product item : list) {
  5. System.out.println(item.getDate().toLocalDateTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
  6. IndexResponse response = client.prepareIndex("product", "_doc", item.getId().toString())
  7. .setSource(XContentFactory.jsonBuilder()
  8. .startObject()
  9. .field("name", item.getName())
  10. .field("desc", item.getDesc())
  11. .field("price", item.getPrice())
  12. .field("date", item.getDate().toLocalDateTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
  13. .field("tags", item.getTags().replace("\"", "").split(","))
  14. .endObject())
  15. .get();
  16. System.out.println(response.getResult());
  17. }
  18. }
  19. @SneakyThrows
  20. private void get(TransportClient client) {
  21. GetResponse response = client.prepareGet("product", "_doc", "1").get();
  22. String index = response.getIndex();//获取索引名称
  23. String type = response.getType();//获取索引类型
  24. String id = response.getId();//获取索引id
  25. System.out.println("index:" + index);
  26. System.out.println("type:" + type);
  27. System.out.println("id:" + id);
  28. System.out.println(response.getSourceAsString());
  29. }
  30. private void getAll(TransportClient client) {
  31. SearchResponse response = client.prepareSearch("product")
  32. .get();
  33. SearchHits searchHits = response.getHits();
  34. SearchHit[] hits = searchHits.getHits();
  35. for (SearchHit hit : hits) {
  36. String res = hit.getSourceAsString();
  37. System.out.println("res" + res);
  38. }
  39. }
  40. @SneakyThrows
  41. private void update(TransportClient client) {
  42. UpdateResponse response = client.prepareUpdate("product", "_doc", "2")
  43. .setDoc(XContentFactory.jsonBuilder()
  44. .startObject()
  45. .field("name", "update name")
  46. .endObject())
  47. .get();
  48. System.out.println(response.getResult());
  49. }
  50. @SneakyThrows
  51. private void delete(TransportClient client) {
  52. DeleteResponse response = client.prepareDelete("product", "_doc", "2").get();
  53. System.out.println(response.getResult());
  54. }

kibana中操作的是Rest api

dsl转成代码

4b3dd960aa35477e9d9f539722427ee5.png

  1. void aggSearch() {
  2. //region 1->创建客户端连接
  3. TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
  4. .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
  5. //endregion
  6. //region 2->计算并返回聚合分析response对象
  7. SearchResponse response = client.prepareSearch("product")
  8. .setSize(0)
  9. .setQuery(QueryBuilders.matchAllQuery())
  10. .addAggregation(AggregationBuilders.dateHistogram("group_by_month")
  11. .field("date")
  12. .calendarInterval(DateHistogramInterval.MONTH)
  13. .minDocCount(1)
  14. .subAggregation(AggregationBuilders.terms("by_tag")
  15. .field("tags.keyword")
  16. .subAggregation(AggregationBuilders.avg("avg_price")
  17. .field("price"))
  18. )
  19. ).execute().actionGet();
  20. //endregion
  21. //region 3->输出结果信息
  22. SearchHit[] hits = response.getHits().getHits();
  23. Map<String, Aggregation> map = response.getAggregations().asMap();
  24. Aggregation group_by_month = map.get("group_by_month");
  25. Histogram dates = (Histogram) group_by_month;
  26. Iterator<Histogram.Bucket> buckets = (Iterator<Histogram.Bucket>) dates.getBuckets().iterator();
  27. while (buckets.hasNext()) {
  28. Histogram.Bucket dateBucket = buckets.next();
  29. System.out.println("\n月份:" + dateBucket.getKeyAsString() + "\n计数:" + dateBucket.getDocCount());
  30. Aggregation by_tag = dateBucket.getAggregations().asMap().get("by_tag");
  31. StringTerms terms = (StringTerms) by_tag;
  32. Iterator<StringTerms.Bucket> tags = terms.getBuckets().iterator();
  33. while (tags.hasNext()) {
  34. StringTerms.Bucket tag = tags.next();
  35. System.out.println("\t标签名称:" + tag.getKey() + "\n\t数量:" + tag.getDocCount());
  36. Aggregation avg_price = tag.getAggregations().get("avg_price");
  37. Avg avg = (Avg) avg_price;
  38. System.out.println("\t平均价格:" + avg.getValue());
  39. }
  40. }
  41. //endregion
  42. client.close();
  43. }

嗅探器sniffer

允许从正在运行的 Elasticsearch 集群中自动发现节点并将它们设置为现有 RestClient 实例的最小库(在集群中,根据一个节点找到其他节点)

依赖

  1. <dependency>
  2. <groupId>org.elasticsearch.client</groupId>
  3. <artifactId>elasticsearch-rest-client-sniffer</artifactId>
  4. <version>7.12.1</version>
  5. </dependency>
  6. // 默认每五分钟发现一次
  7. RestClient restClient = RestClient.builder(
  8. new HttpHost("localhost", 9200, "http"))
  9. .build();
  10. Sniffer sniffer = Sniffer.builder(restClient).build();
  11. // 设置嗅探间隔
  12. RestClient restClient = RestClient.builder(
  13. new HttpHost("localhost", 9200, "http"))
  14. .build();
  15. // 设置嗅探间隔为60000毫秒
  16. Sniffer sniffer = Sniffer.builder(restClient)
  17. .setSniffIntervalMillis(60000).build();
  18. // 失败时重启嗅探
  19. SniffOnFailureListener sniffOnFailureListener =
  20. new SniffOnFailureListener();
  21. RestClient restClient = RestClient.builder(
  22. new HttpHost("localhost", 9200))
  23. .setFailureListener(sniffOnFailureListener) //将失败侦听器设置为 RestClient 实例
  24. .build();
  25. Sniffer sniffer = Sniffer.builder(restClient)
  26. .setSniffAfterFailureDelayMillis(30000) //在嗅探失败时,不仅节点在每次失败后都会更新,而且还会比平常更早安排额外的嗅探轮次,默认情况下是在失败后一分钟,假设事情会恢复正常并且我们想要检测尽快地。可以在 Sniffer 创建时通过 setSniffAfterFailureDelayMillis 方法自定义所述间隔。请注意,如果如上所述未启用故障嗅探,则最后一个配置参数无效。
  27. .build();
  28. sniffOnFailureListener.setSniffer(sniffer); //将 Sniffer 实例设置为失败侦听器
  29. // 资源释放
  30. // Sniffer 对象应该与RestClient 具有相同的生命周期,并在客户端之前关闭。
  31. sniffer.close();
  32. restClient.close();

Spring Data Elasticsearch

Spring Data 的目的是用统一的接口,适配所有不同的存储类型。

Spring Data Elasticsearch是Spring Data的一个子项目,该项目旨在为新数据存储提供熟悉且一致的基于 Spring 的编程模型,同时保留特定于存储的功能和功能。Spring Data Elasticsearch是一个以 POJO 为中心的模型,用于与 Elastichsearch 文档交互并轻松编写 Repository 风格的数据访问层

特点
  • Spring 配置支持使用基于 Java 的@Configuration类或用于 ES 客户端实例的 XML 命名空间。
  • ElasticsearchTemplate提高执行常见 ES 操作的生产力的助手类。包括文档和 POJO 之间的集成对象映射。
  • 功能丰富的对象映射与 Spring 的转换服务集成
  • 基于注释的映射元数据但可扩展以支持其他元数据格式
  • Repository接口的自动实现,包括对自定义查找器方法的支持。
  • 对存储库的 CDI 支持

依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
  4. </dependency>

注解

@Document:在类级别应用,以指示该类是映射到数据库的候选类。最重要的属性包括:

indexName:用于存储此实体的索引的名称。它可以包含类似于“日志-#{T(java.time.LocalDate).now().toString()}”

type :映射类型。如果未设置,则使用该类的小写简单名称。(自4.0版起已弃用)

createIndex:标记是否在存储库引导时创建索引。默认值为true。请参阅自动创建带有相应映射的索引

versionType:版本管理的配置。默认值为外部 .

@Id:在字段级别应用,以标记用于标识的字段。

@Transient:默认情况下,存储或检索文档时,所有字段都映射到文档,此批注不包括该字段。

@PersistenceConstructor:标记在从数据库实例化对象时要使用的给定构造函数(甚至是包受保护的构造函数)。构造函数参数按名称映射到检索文档中的键值。

@Field:应用于字段级别并定义字段的属性,大多数属性映射到相应的Elasticsearch映射定义(以下列表不完整,请查看注释Javadoc以获取完整的参考):

name:将在Elasticsearch文档中表示的字段的名称,如果未设置,则使用Java字段名称。

type:字段类型,可以是Text,关键字,Long,Integer,Short,Byte,Double,Float,Half_Float,Scaled_Float,日期,日期Nanos,Boolean,Binary,Integer_Range,Float_Range,Long_Range,DoubleˉRange,DateˉRange,Object,Nested,Ip,TokenCount,percollator,flatten,搜索。请参阅Elasticsearch映射类型

format:一个或多个内置日期格式,请参阅下一节格式数据映射 .

pattern:一个或多个自定义日期格式,请参阅下一节格式数据映射 .

store:标志是否应将原始字段值存储在Elasticsearch中,默认值为假 .

analyzer ,搜索分析器 ,normalizer用于指定自定义分析器和规格化器。

@GeoPoint:将字段标记为地理点如果字段是GeoPoint班级

简单使用

  1. public class EsUtil {
  2. // 生成批量处理对象
  3. private static BulkRequest bulkRequest = new BulkRequest();
  4. /**
  5. * 添加数据到es
  6. * @param indexName
  7. * @param typeName
  8. * @param indexId
  9. * @param json
  10. */
  11. public static void add(RestHighLevelClient restHighLevelClient, String indexName, String typeName, String indexId, Map<String, Object> json) throws IOException {
  12. IndexRequest indexRequest = new IndexRequest(indexName, typeName,indexId);
  13. // Gson gson = new Gson();
  14. indexRequest.source(new JSONObject(json).toString(), XContentType.JSON);
  15. try {
  16. restHighLevelClient.index(indexRequest,RequestOptions.DEFAULT);
  17. } catch (IOException e) {
  18. // TODO Auto-generated catch block
  19. e.printStackTrace();
  20. }
  21. }
  22. /**
  23. * 判断索引名是否存在
  24. * @param indexName
  25. * @return
  26. */
  27. public static boolean existsIndex(RestHighLevelClient restHighLevelClient,String indexName) {
  28. try{
  29. GetIndexRequest request = new GetIndexRequest(indexName);
  30. boolean exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
  31. return exists;
  32. }catch (Exception e){
  33. System.out.println("Exception");
  34. }
  35. return false;
  36. }
  37. /**
  38. * @param : client
  39. * @description : 判断文档是否存在
  40. */
  41. public static boolean isExist(RestHighLevelClient restHighLevelClient, String indexName, String typeName, String indexId) throws IOException{
  42. GetRequest request = new GetRequest(indexName, typeName, indexId);
  43. //1.同步判断
  44. boolean exists = restHighLevelClient.exists(request, RequestOptions.DEFAULT);
  45. //2.异步判断
  46. ActionListener<Boolean> listener = new ActionListener<Boolean>() {
  47. @Override
  48. public void onResponse(Boolean exists) {
  49. if (exists){
  50. System.out.println("文档存在");
  51. }else {
  52. System.out.println("文档不存在");
  53. }
  54. }
  55. @Override
  56. public void onFailure(Exception e) {
  57. }
  58. };
  59. //client.existsAsync(request, RequestOptions.DEFAULT, listener);
  60. return exists;
  61. }
  62. /**
  63. * @param : client
  64. * @description : 删除文档
  65. */
  66. public static void deleteDocument(RestHighLevelClient restHighLevelClient, String indexName, String typeName, String indexId) throws IOException{
  67. DeleteRequest request = new DeleteRequest(indexName,typeName,indexId);
  68. //设置请求超时时间:2分钟
  69. request.timeout(TimeValue.timeValueMinutes(2));
  70. //request.timeout("2m");
  71. //同步删除
  72. DeleteResponse deleteResponse = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
  73. //异步删除
  74. ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
  75. @Override
  76. public void onResponse(DeleteResponse deleteResponse) {
  77. System.out.println("删除后操作");
  78. }
  79. @Override
  80. public void onFailure(Exception e) {
  81. System.out.println("删除失败");
  82. }
  83. };
  84. }
  85. /**
  86. * 批量增加数据的方法
  87. * @param restHighLevelClient
  88. * @param indexname
  89. * @param typename
  90. * @param row_key
  91. * @param map
  92. * @throws Exception
  93. */
  94. public void bulkadd(RestHighLevelClient restHighLevelClient, String indexname, String typename, String row_key, Map<String,Object> map) throws Exception {
  95. try {
  96. // 生成批量处理对象
  97. //BulkRequest bulkRequest = new BulkRequest();
  98. // 得到某一行的数据,并封装成索引对象
  99. IndexRequest indexRequest = new IndexRequest(indexname, typename,row_key);
  100. indexRequest.source(new JSONObject(map).toString(), XContentType.JSON);
  101. //判断是否执行加载
  102. if (bulkRequest.numberOfActions() != 0 && (bulkRequest.numberOfActions() > 100)) {
  103. try {
  104. bulkRequest(restHighLevelClient);
  105. } catch (Exception e) {
  106. e.printStackTrace();
  107. }
  108. }
  109. // 装填数据
  110. bulkRequest.add(indexRequest);
  111. } catch (Exception e) {
  112. e.printStackTrace();
  113. }finally {
  114. bulkRequest(restHighLevelClient);
  115. }
  116. }
  117. /**
  118. * 批量具体执行方法
  119. * execute bulk process
  120. * @throws Exception
  121. */
  122. private void bulkRequest(RestHighLevelClient restHighLevelClient) throws Exception {
  123. // 加载数据
  124. BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  125. // 判断加载情况
  126. if(bulkResponse.hasFailures()){
  127. System.out.println("失败");
  128. }else{
  129. System.out.println("成功");
  130. // 重新定义
  131. bulkRequest = new BulkRequest();
  132. }
  133. }
  134. }

发表评论

表情:
评论列表 (有 0 条评论,108人围观)

还没有评论,来说两句吧...

相关阅读