java操作 elasticsearch进行增删改查

一时失言乱红尘 2022-05-26 11:20 383阅读 0赞

本文中所使用的 elasticsearch 版本是 5.1.1

elasticsearch版本不同,对其进行操作的API也不同。

具体可以详见elsticsearch官方文档,可以选择不同版本的javaapi,操作版本不同,其api也会有相应的差异

api见下面的网址

http://www.elastic.co/guide/en/elasticsearch/client/java-api/5.1/index.html

elasticsearch的启动和具体增删改语句见下面两篇博客:

安装: https://blog.csdn.net/baidu\_24545901/article/details/79019812

增删改查: https://blog.csdn.net/baidu\_24545901/article/details/79031291

接下来代码主要是 elasticsearch5.1.1版本的javaapi

  1. package com.csii.webservice.dao;
  2. import java.io.IOException;
  3. import java.net.InetAddress;
  4. import java.net.InetSocketAddress;
  5. import java.net.UnknownHostException;
  6. import java.util.Date;
  7. import java.util.concurrent.ExecutionException;
  8. import org.elasticsearch.action.ActionListener;
  9. import org.elasticsearch.action.bulk.BulkRequestBuilder;
  10. import org.elasticsearch.action.bulk.BulkResponse;
  11. import org.elasticsearch.action.delete.DeleteResponse;
  12. import org.elasticsearch.action.get.GetResponse;
  13. import org.elasticsearch.action.get.MultiGetItemResponse;
  14. import org.elasticsearch.action.get.MultiGetResponse;
  15. import org.elasticsearch.action.index.IndexRequest;
  16. import org.elasticsearch.action.index.IndexResponse;
  17. import org.elasticsearch.action.update.UpdateRequest;
  18. import org.elasticsearch.client.transport.TransportClient;
  19. import org.elasticsearch.common.settings.Settings;
  20. import org.elasticsearch.common.transport.InetSocketTransportAddress;
  21. import org.elasticsearch.common.xcontent.XContentBuilder;
  22. import org.elasticsearch.common.xcontent.XContentFactory;
  23. import org.elasticsearch.index.query.QueryBuilders;
  24. import org.elasticsearch.index.reindex.BulkIndexByScrollResponse;
  25. import org.elasticsearch.index.reindex.DeleteByQueryAction;
  26. import org.elasticsearch.script.Script;
  27. import org.elasticsearch.script.ScriptService;
  28. import org.elasticsearch.transport.client.PreBuiltTransportClient;
  29. import org.junit.Test;
  30. import org.slf4j.Logger;
  31. import org.slf4j.LoggerFactory;
  32. import org.springframework.stereotype.Repository;
  33. import com.csii.webservice.entiry.Message;
  34. @Repository
  35. public class MessageDao {
  36. public final static String host = "127.0.0.1";
  37. public final static int port = 9300;
  38. private Logger logger = LoggerFactory.getLogger(MessageDao.class);
  39. /*
  40. 这个是 elasticsearch的增加索引并且增加数据
  41. */
  42. public static void main1(String[] args) /*addIndex1()*/ {
  43. try{
  44. TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
  45. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300));
  46. System.out.println("连接成功!!");
  47. IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
  48. .setSource(XContentFactory.jsonBuilder()
  49. .startObject()
  50. .field("user", "kimchy")
  51. .field("postDate", new Date())
  52. .field("message", "trying out Elasticsearch")
  53. .endObject()
  54. )
  55. .get();
  56. System.out.println(
  57. response.getId()+" "+
  58. response.getType()+" "+
  59. response.getVersion()
  60. );
  61. }catch(Exception e){
  62. e.printStackTrace();
  63. }
  64. }
  65. /*
  66. 这个是增加索引的第二种形式
  67. */
  68. public static void main2(String[] args) {
  69. try{
  70. TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
  71. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300));
  72. System.out.println("连接成功!!");
  73. String json = "{" +
  74. "\"user\":\"kimchy\"," +
  75. "\"postDate\":\"2013-01-30\"," +
  76. "\"message\":\"trying out Elasticsearch\"" +
  77. "}";
  78. IndexResponse response = client.prepareIndex("twitter1", "tweet1")
  79. .setSource(json)
  80. .get();
  81. System.out.println(
  82. response.getId()+" "+
  83. response.getType()+" "+
  84. response.getVersion()
  85. );
  86. }catch(Exception e){
  87. e.printStackTrace();
  88. }
  89. }
  90. /*
  91. 获取es中数据的api
  92. */
  93. public static void main3(String[] args) {
  94. try{
  95. TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
  96. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300));
  97. System.out.println("连接成功!!");
  98. GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
  99. System.out.println(response);
  100. GetResponse response1 = client.prepareGet("twitter", "tweet", "1")
  101. .setOperationThreaded(false)
  102. .get();
  103. System.out.println(response1);
  104. }catch(Exception e){
  105. e.printStackTrace();
  106. }
  107. }
  108. /*
  109. 测试es删除的 api, 删除的是json数据,索引不删除
  110. */
  111. public static void main4(String[] args) throws Exception {
  112. TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
  113. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300));
  114. System.out.println("连接成功!!");
  115. DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
  116. System.out.println(response);
  117. }
  118. /*
  119. 删除elasticsearch当中的index中的数据
  120. */
  121. public static void main5(String[] args) throws Exception {
  122. TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
  123. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300));
  124. System.out.println("连接成功!!");
  125. BulkIndexByScrollResponse response =
  126. DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
  127. .filter(QueryBuilders.matchQuery("title", "爬")) //先通过一个查询规则查询
  128. .source("twitter") //查询的 index
  129. .get(); //获取到查询的返回
  130. long deleted = response.getDeleted(); //获取到 一共删除的总条数
  131. System.out.println(deleted);
  132. }
  133. /*
  134. 删除es中的数据
  135. 同时在删除的时候加上了事件的监听
  136. */
  137. public static void main6(String[] args) throws Exception {
  138. TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
  139. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300));
  140. DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
  141. .filter(QueryBuilders.matchQuery("title", "java"))
  142. .source("twitter")
  143. .execute(new ActionListener<BulkIndexByScrollResponse>() {
  144. public void onResponse(BulkIndexByScrollResponse response) {
  145. long deleted = response.getDeleted();
  146. System.out.println(deleted);
  147. }
  148. public void onFailure(Exception e) {
  149. // Handle the exception
  150. System.out.println("删除失败");
  151. }
  152. });
  153. }
  154. /*
  155. 执行es修改数据的操作
  156. */
  157. public static void main7(String[] args) throws Exception {
  158. TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
  159. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300));
  160. UpdateRequest updateRequest = new UpdateRequest();
  161. updateRequest.index("twitter");
  162. updateRequest.type("tweet");
  163. updateRequest.id("1");
  164. updateRequest.doc(XContentFactory.jsonBuilder()
  165. .startObject()
  166. .field("salary_min", "16000")
  167. .endObject());
  168. client.update(updateRequest).get();
  169. }
  170. /*
  171. 执行es修改数据的操作
  172. 使用 client 的 prepareUpdate这个方法
  173. */
  174. public static void main8(String[] args) throws Exception {
  175. TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
  176. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300));
  177. client.prepareUpdate("twitter", "tweet", "1")
  178. .setDoc(XContentFactory.jsonBuilder()
  179. .startObject()
  180. .field("salary_min", "17000")
  181. .endObject())
  182. .get();
  183. }
  184. /*
  185. 通过传json字符串的形式进行更新操作
  186. */
  187. public static void main9(String[] args) throws Exception, ExecutionException {
  188. TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
  189. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300));
  190. UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")
  191. .script(new Script("ctx._source.salary_min = \"19000\""));
  192. client.update(updateRequest).get();
  193. }
  194. /*
  195. 插入或者更新的操作
  196. 当id值记录存在的时候执行修改操作,
  197. 当id值不存在的时候执行插入操作
  198. */
  199. public static void main10(String[] args) throws Exception {
  200. TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
  201. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300));
  202. IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "4")
  203. .source(XContentFactory.jsonBuilder()
  204. .startObject()
  205. .field("salary_min", "20000")
  206. .endObject());
  207. UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "4")
  208. .doc(XContentFactory.jsonBuilder()
  209. .startObject()
  210. .field("salary_min", "10000")
  211. .endObject())
  212. .upsert(indexRequest);
  213. client.update(updateRequest).get();
  214. }
  215. /*
  216. 多元化的get
  217. */
  218. public static void main(String[] args) throws Exception {
  219. TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
  220. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300));
  221. MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
  222. .add("twitter", "tweet", "3") //通过单独id get
  223. .add("twitter", "tweet", "3", "4") //通过 list 的id get
  224. .add("twitter1", "tweet", "3") //也可以通过另外一个索引
  225. .get();
  226. for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
  227. GetResponse response = itemResponse.getResponse();
  228. if (response.isExists()) {
  229. String json = response.getSourceAsString();
  230. System.out.println("-------------------");
  231. System.out.println( json );
  232. }
  233. }
  234. }
  235. /*
  236. 通过bulk来操作 es,添加数据
  237. */
  238. public static void main12(String[] args) throws Exception {
  239. TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
  240. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300));
  241. BulkRequestBuilder bulkRequest = client.prepareBulk();
  242. bulkRequest.add(client.prepareIndex("twitter", "tweet", "3")
  243. .setSource(XContentFactory.jsonBuilder()
  244. .startObject()
  245. .field("user", "kimchy")
  246. .field("postDate", new Date())
  247. .field("message", "trying out Elasticsearch")
  248. .endObject()
  249. )
  250. );
  251. bulkRequest.add(client.prepareIndex("twitter", "tweet", "5")
  252. .setSource(XContentFactory.jsonBuilder()
  253. .startObject()
  254. .field("user", "kimchy")
  255. .field("postDate", new Date())
  256. .field("message", "another post")
  257. .endObject()
  258. )
  259. );
  260. }
  261. public int addMessage(Message message) {
  262. // TODO Auto-generated method stub
  263. try {
  264. TransportClient client = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddresses(
  265. new InetSocketTransportAddress(InetAddress.getByName(host),port));
  266. BulkIndexByScrollResponse response =
  267. DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
  268. .filter(QueryBuilders.matchQuery("title", "爬"))
  269. .source("twitter")
  270. .get();
  271. long deleted = response.getDeleted();
  272. System.out.println(deleted);
  273. } catch (UnknownHostException e) {
  274. // TODO Auto-generated catch block
  275. e.printStackTrace();
  276. }
  277. return 1;
  278. }
  279. }

发表评论

表情:
评论列表 (有 0 条评论,383人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Elasticsearch增删

    Elasticsearch的增删改查: 面向文档 document数据格式 1. 应用系统的数据结构都是面向对象的,复杂的 2. 对象数据存储到数据库中,只能拆

    相关 Elasticsearch增删

    面向文档 document数据格式 1. 应用系统的数据结构都是面向对象的,复杂的 2. 对象数据存储到数据库中,只能拆解开来,变为扁平的多张表,每次查询的时候还得