我就廢話不多說了,大家還是直接看代碼吧~
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
/* *es配置類 * */ @Configuration public class ElasticSearchDataSourceConfigurer { private static final Logger LOG = LogManager.getLogger(ElasticSearchDataSourceConfigurer. class ); @Bean public TransportClient getESClient() { //設置集群名稱 Settings settings = Settings.builder().put( "cluster.name" , "bigData-cluster" ).put( "client.transport.sniff" , true ).build(); //創建client TransportClient client = null ; try { client = new PreBuiltTransportClient(settings) .addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName( "" ), 9300 )); //集群ip LOG.info( "ESClient連接建立成功" ); } catch (UnknownHostException e) { LOG.info( "ESClient連接建立失敗" ); e.printStackTrace(); } return client; } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
/** * Simple to Introduction * * @Description: [添加類] */ @Repository public class UserDaoImpl implements userDao { private static final String INDEXNAME = "user" ; //小寫 private static final String TYPENAME = "info" ; @Resource TransportClient transportClient; @Override public int addUser(User[] user) { IndexResponse indexResponse = null ; int successNum = 0 ; for ( int i = 0 ; i < user.length; i++) { UUID uuid = UUID.randomUUID(); String str = uuid.toString(); String jsonValue = null ; try { jsonValue = JsonUtil.object2JsonString(user[i]); if (jsonValue != null ) { indexResponse = transportClient.prepareIndex(INDEXNAME, TYPENAME, str).setSource(jsonValue) .execute().actionGet(); successNum++; } } catch (JsonProcessingException e) { e.printStackTrace(); } } return successNum; } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/** *批量插入 */ public static void bathAddUser(TransportClient client, List<User> users) { BulkRequestBuilder bulkRequest = transportClient.prepareBulk(); for ( int i = 0 ; i < users.size(); i++) { UUID uuid = UUID.randomUUID(); String str = uuid.toString(); String jsonValue = null ; try { jsonValue = JsonUtil.object2JsonString(users.get(i)); } catch (JsonProcessingException e) { e.printStackTrace(); } bulkRequest.add(client.prepareIndex( "user" , "info" , str).setSource(jsonValue)); // 一萬條插入一次 if (i % 10000 == 0 ) { bulkRequest.execute().actionGet(); } System.out.println( "已經插入第" + i + "多少條" ); } } |
補充知識:使用java創建ES(ElasticSearch)連接池
1.首先要有一個創建連接的工廠類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
package com.aly.util; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; /** * EliasticSearch連接池工廠對象 * @author 00000 * */ public class EsClientPoolFactory implements PooledObjectFactory<RestHighLevelClient>{ @Override public void activateObject(PooledObject<RestHighLevelClient> arg0) throws Exception { System.out.println( "activateObject" ); } /** * 銷毀對象 */ @Override public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception { RestHighLevelClient highLevelClient = pooledObject.getObject(); highLevelClient.close(); } /** * 生產對象 */ // @SuppressWarnings({ "resource" }) @Override public PooledObject<RestHighLevelClient> makeObject() throws Exception { // Settings settings = Settings.builder().put("cluster.name","elasticsearch").build(); RestHighLevelClient client = null ; try { /*client = new PreBuiltTransportClient(settings) .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"),9300));*/ client = new RestHighLevelClient(RestClient.builder( new HttpHost( "192.168.1.121" , 9200 , "http" ), new HttpHost( "192.168.1.122" , 9200 , "http" ), new HttpHost( "192.168.1.123" , 9200 , "http" ), new HttpHost( "192.168.1.125" , 9200 , "http" ), new HttpHost( "192.168.1.126" , 9200 , "http" ), new HttpHost( "192.168.1.127" , 9200 , "http" ))); } catch (Exception e) { e.printStackTrace(); } return new DefaultPooledObject<RestHighLevelClient>(client); } @Override public void passivateObject(PooledObject<RestHighLevelClient> arg0) throws Exception { System.out.println( "passivateObject" ); } @Override public boolean validateObject(PooledObject<RestHighLevelClient> arg0) { return true ; } } |
2.然后再寫我們的連接池工具類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
package com.aly.util; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.elasticsearch.client.RestHighLevelClient; /** * ElasticSearch 連接池工具類 * * @author 00000 * */ public class ElasticSearchPoolUtil { // 對象池配置類,不寫也可以,采用默認配置 private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); // 采用默認配置maxTotal是8,池中有8個client static { poolConfig.setMaxTotal( 8 ); } // 要池化的對象的工廠類,這個是我們要實現的類 private static EsClientPoolFactory esClientPoolFactory = new EsClientPoolFactory(); // 利用對象工廠類和配置類生成對象池 private static GenericObjectPool<RestHighLevelClient> clientPool = new GenericObjectPool<>(esClientPoolFactory, poolConfig); /** * 獲得對象 * * @return * @throws Exception */ public static RestHighLevelClient getClient() throws Exception { // 從池中取一個對象 RestHighLevelClient client = clientPool.borrowObject(); return client; } /** * 歸還對象 * * @param client */ public static void returnClient(RestHighLevelClient client) { // 使用完畢之后,歸還對象 clientPool.returnObject(client); } } |
以上這篇java連接ElasticSearch集群操作就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持服務器之家。
原文鏈接:https://blog.csdn.net/baidu_16217779/article/details/71633284