GVKun编程网logo

使用Elastic Search 5.5.0以获得最佳性能时,如何正确关闭Raw RestClient?(关闭elasticsearch进程)

11

此处将为大家介绍关于使用ElasticSearch5.5.0以获得最佳性能时,如何正确关闭RawRestClient?的详细内容,并且为您解答有关关闭elasticsearch进程的相关问题,此外,我

此处将为大家介绍关于使用Elastic Search 5.5.0以获得最佳性能时,如何正确关闭Raw RestClient?的详细内容,并且为您解答有关关闭elasticsearch进程的相关问题,此外,我们还将为您介绍关于Elastic Search RestClient 如何保持长链接?、ElasticSearch - 学习笔记 02-springboot 整合 jestclient 操作 elasticSearch、Elasticsearch High Level REST Client、Elasticsearch Java client(ES Client 简介、Java REST Client、Java Client、Spring Data Elasticsearch)的有用信息。

本文目录一览:

使用Elastic Search 5.5.0以获得最佳性能时,如何正确关闭Raw RestClient?(关闭elasticsearch进程)

我正在使用Spring Boot 1.5.4.RELEASE微服务使用ElasticSearch提供的低级Rest客户端连接到ElasticSearch
5.5.0实例。

pom.xml

<parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>1.5.4.RELEASE</version></parent><dependencies>    <!-- Spring -->    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-test</artifactId>        <scope>test</scope>    </dependency>    <!-- Elasticsearch -->    <dependency>        <groupId>org.elasticsearch</groupId>        <artifactId>elasticsearch</artifactId>        <version>5.5.0</version>    </dependency>    <dependency>        <groupId>org.elasticsearch.client</groupId>        <artifactId>transport</artifactId>        <version>5.5.0</version>    </dependency>    <!-- Apache Commons -->    <dependency>        <groupId>org.apache.commons</groupId>        <artifactId>commons-lang3</artifactId>        <version>3.6</version>    </dependency>    <!-- Jackson -->    <dependency>        <groupId>com.fasterxml.jackson.core</groupId>        <artifactId>jackson-core</artifactId>        <version>2.8.9</version>    </dependency>    <dependency>        <groupId>com.fasterxml.jackson.core</groupId>        <artifactId>jackson-databind</artifactId>        <version>2.8.9</version>    </dependency>    <dependency>        <groupId>com.fasterxml.jackson.core</groupId>        <artifactId>jackson-annotations</artifactId>        <version>2.8.9</version>    </dependency>    <!-- Log4j -->    <dependency>        <groupId>log4j</groupId>        <artifactId>log4j</artifactId>        <version>1.2.17</version>    </dependency>    <!-- JUnit -->    <dependency>        <groupId>junit</groupId>        <artifactId>junit</artifactId>        <version>4.11</version>        <scope>test</scope>    </dependency>    <!-- Swagger -->    <dependency>        <groupId>io.springfox</groupId>        <artifactId>springfox-swagger2</artifactId>        <version>2.6.1</version>        <scope>compile</scope>    </dependency>    <dependency>        <groupId>io.springfox</groupId>        <artifactId>springfox-swagger-ui</artifactId>        <version>2.6.1</version>        <scope>compile</scope>    </dependency></dependencies>

一切都已正确设置,但是在点击后,客户端应用程序报告了HTTP 500错误,这是日志文件中显示的内容:

java.io.IOException: Too many open files        at sun.nio.ch.IOUtil.makePipe(Native Method) ~[na:1.8.0_141]        at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65) ~[na:1.8.0_141]        at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36) ~[na:1.8.0_141]        at java.nio.channels.Selector.open(Selector.java:227) ~[na:1.8.0_141]        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.<init>(AbstractMultiworkerIOReactor.java:142) ~[httpcore-nio-4.4.5.jar!/:4.4.5]        at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.<init>(DefaultConnectingIOReactor.java:79) ~[httpcore-nio-4.4.5.jar!/:4.4.5]        at org.apache.http.impl.nio.client.IOReactorUtils.create(IOReactorUtils.java:43) ~[httpasyncclient-4.1.3.jar!/:4.1.3]        at org.apache.http.impl.nio.client.HttpAsyncClientBuilder.build(HttpAsyncClientBuilder.java:666) ~[httpasyncclient-4.1.3.jar!/:4.1.3]        at org.elasticsearch.client.RestClientBuilder.createHttpClient(RestClientBuilder.java:202) ~[rest-5.5.0.jar!/:5.5.0]        at org.elasticsearch.client.RestClientBuilder.build(RestClientBuilder.java:180) ~[rest-5.5.0.jar!/:5.5.0]        at com.myapp.controller.SearchController.getSearchQueryResults(SearchController.java:94) ~[classes!/:1.0]

在SearchController内部(//注释之后的第二行是94行):

@RestController@RequestMapping("/api/v1")public class SearchController {    @RequestMapping(value = "/search", method = RequestMethod.GET, produces="application/json" )    public ResponseEntity<Object> getSearchQueryResults(@RequestParam(value = "criteria") String criteria) throws IOException {        // Setup HTTP Headers        HttpHeaders headers = new HttpHeaders();        headers.add("Content-Type", "application/json");        // Setup RestClient        RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200))        .setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {            @Override            public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {                return requestConfigBuilder.setConnectTimeout(5000).setSocketTimeout(60000);            }        }).setMaxRetryTimeoutMillis(60000).build();        // Setup query and send and return ResponseEntity...    }}

很明显,在调用restClient.performRequest()方法之后它从未关闭过…

因此,我将其放入代码中:

Response response = null;try {   // Submit Query and Obtain Response   response = restClient.performRequest("POST", endPoint,  Collections.singletonMap("pretty", "true"), entity);}catch (IOException e) {   LOG.error("\n\n\tException: " + e + "\n\n");   e.printStackTrace();}finally {   restClient.close();}

阅读Elastic Search的文档,了解RestClient类是线程安全的。

另外,请阅读有关restClient.performRequestAsync()方法的信息,但是对线程没有任何经验,文档中的描述也很模糊。

问题:

  1. 我的解决方案是处理和关闭大量套接字资源的最佳方法吗?

  2. 如果有人可以向我展示一种更好的方法来将低级RestClient与Elastic Search结合使用,就不会在套接字资源未被释放导致HTTP 500的情况下引起相同的问题,我将不胜感激。我是否应该使用restClient.performRequestAsync ?有人可以提供一个例子吗?

感谢您抽出时间来阅读…

答案1

小编典典

RestClient在每个请求上创建一个不是一个好习惯。您应该通过如下所示的配置bean创建一个实例:

@Configurationpublic class ElasticsearchConfig {    @Value("${elasticsearch.host}")    private String host;    @Value("${elasticsearch.port}")    private int port;    @Bean    public RestClient restClient() {        return RestClient.builder(new HttpHost(host, port))        .setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {            @Override            public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {                return requestConfigBuilder.setConnectTimeout(5000).setSocketTimeout(60000);            }        }).setMaxRetryTimeoutMillis(60000).build();    }}

然后,在您的SearchController类中,您可以像这样注入它(并且还添加了一个清理方法,以restClient在容器关闭时关闭实例):

@RestController@RequestMapping("/api/v1")public class SearchController {    @Autowired    private RestClient restClient;    @RequestMapping(value = "/search", method = RequestMethod.GET, produces="application/json" )    public ResponseEntity<Object> getSearchQueryResults(@RequestParam(value = "criteria") String criteria) throws IOException {        // Setup HTTP Headers        HttpHeaders headers = new HttpHeaders();        headers.add("Content-Type", "application/json");        // Setup query and send and return ResponseEntity...        Response response = this.restClient.performRequest(...);    }    @PreDestroy    public void cleanup() {        try {            logger.info("Closing the ES REST client");            this.restClient.close();        } catch (IOException ioe) {            logger.error("Problem occurred when closing the ES REST client", ioe);        }    }}

Elastic Search RestClient 如何保持长链接?

线上环境 使用 elastic search 官方的rest client客户端 进行索引查询,发现 是通过短链接进行请求的,导致产生了很多TIME_WAIT连接,官方说rest client的特性之一 是 persist connection,起初以为是http request header中没有指明 Keep Alive,但是添加后仍然是短链接,线上elastic使用的是5.2版本,请教下 如果设置可以使用长链接来请求。

ElasticSearch - 学习笔记 02-springboot 整合 jestclient 操作 elasticSearch

ElasticSearch - 学习笔记 02-springboot 整合 jestclient 操作 elasticSearch

 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.16.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.huarui</groupId>
    <artifactId>sb_elasticsearch_jestclient</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>sb_elasticsearch_jestclient</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
            <version>5.3.3</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>5.6.7</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </pluginRepository>
        <pluginRepository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
        </pluginRepository>
    </pluginRepositories>

</project>
pom.xml

 

		<dependency>
			<groupId>io.searchbox</groupId>
			<artifactId>jest</artifactId>
			<version>5.3.3</version>
		</dependency>
		<dependency>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
			<version>5.6.7</version>
		</dependency>

 

spring.elasticsearch.jest.uris = http://192.168.79.129:9200/
spring.elasticsearch.jest.read-timeout = 10000
spring.elasticsearch.jest.username =
spring.elasticsearch.jest.password =

 

junit

 

import com.huarui.entity.User;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.core.*;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.DeleteIndex;
import io.searchbox.indices.mapping.GetMapping;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ElasticApplicationTests {

    private static String indexName = "userindex";
    private static String typeName = "user";

    @Autowired
    JestClient jestClient;

    /**
     * 新增数据
     * @return
     * @throws Exception
     */
    @Test
    public void insert() throws Exception {
        User user = new User(1L, "张三", 20, "张三是个Java开发工程师","2018-4-25 11:07:42");
        Index index = new Index.Builder(user).index(indexName).type(typeName).build();
        try{
            JestResult jr = jestClient.execute(index);
            System.out.println(jr.isSucceeded());
        }catch(IOException e){
            e.printStackTrace();
        }
    }

}

 

 

Elasticsearch High Level REST Client

Elasticsearch High Level REST Client

Compatibility

The Java High Level REST Client requires Java 1.8 and depends on the Elasticsearch core project. The client version is the same as the Elasticsearch version that the client was developed for. It accepts the same request arguments as the TransportClient and returns the same response objects. See the Migration Guide if you need to migrate an application from TransportClient to the new REST client.

 

The High Level Client is guaranteed to be able to communicate with any Elasticsearch node running on the same major version and greater or equal minor version. It doesn’t need to be in the same minor version as the Elasticsearch nodes it communicates with, as it is forward compatible meaning that it supports communicating with later versions of Elasticsearch than the one it was developed for.

 

The 6.0 client is able to communicate with any 6.x Elasticsearch node, while the 6.1 client is for sure able to communicate with 6.1, 6.2 and any later 6.x version, but there may be incompatibility issues when communicating with a previous Elasticsearch node version, for instance between 6.1 and 6.0, in case the 6.1 client supports new request body fields for some APIs that are not known by the 6.0 node(s).

 

It is recommended to upgrade the High Level Client when upgrading the Elasticsearch cluster to a new major version, as REST API breaking changes may cause unexpected results depending on the node that is hit by the request, and newly added APIs will only be supported by the newer version of the client. The client should always be updated last, once all of the nodes in the cluster have been upgraded to the new major version.

Test Case

ES:5.2.2

POM

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.2.4</version>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>6.2.4</version>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.2.4</version>
</dependency>

Initialization

RestHighLevelClient instance needs a REST low-level client builder to be built as follows:

RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("localhost", 9200, "http"), new HttpHost("localhost", 9201, "http")));

The high-level client will internally create the low-level client used to perform requests based on the provided builder. That low-level client maintains a pool of connections and starts some threads so you should close the high-level client when you are well and truly done with it and it will in turn close the internal low-level client to free those resources. This can be done through the close:

client.close();

In the rest of this documentation about the Java High Level Client, the RestHighLevelClient instance will be referenced as client.

Supported APIs

The Java High Level REST Client supports the following APIs:

Indices APIs
  • Create Index API
  • Delete Index API
  • Open Index API
  • Close Index API
Single document APIs
  • Index API
  • Get API
  • Delete API
  • Update API
Multi document APIs
  • Bulk API
Search APIs
  • Search API
  • Search Scroll API
  • Clear Scroll API
Miscellaneous APIs
  • Info API
package com.ftofs.esclient;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.ftofs.esclient.common.ESAnnotation.IndexAnnotation;
import com.ftofs.esclient.common.JsonHelper;
import com.ftofs.esclient.vo.CommentResultVo;
import com.ftofs.esclient.vo.ImResultVo;
import com.ftofs.esclient.vo.PostResultVo;
import com.ftofs.esclient.vo.SearchCommentResultVo;
import com.ftofs.esclient.vo.SearchImResultVo;
import com.ftofs.esclient.vo.SearchPostResultVo;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

@Component
public class  ESRestService {
    private RestHighLevelClient client = null;
    private RestClient lowClient = null;
    protected final Logger logger = LoggerFactory.getLogger(getClass());

    @Value("${esHost}")
    private String esHost;

    @Value("${esPort}")
    private int esPort;

    @Value("${esScheme}")
    private String esScheme;

    @Value("${esIndexImMessage}")
    private String esIndexImMessage;

    @Value("${esTypeImMessage}")
    private String esTypeImMessage;

    @Value("${esIndexWantPost}")
    private String esIndexWantPost;

    @Value("${esTypeWantPost}")
    private String esTypeWantPost;

    @Value("${esIndexWantComment}")
    private String esIndexWantComment;

    @Value("${esTypeWantComment}")
    private String esTypeWantComment;

    public void imReindexRange(List<ImResultVo> imResultVoList) {
        bulkInsert(esIndexImMessage, esTypeImMessage, imResultVoList);
    }

    public void postReindexRange(List<PostResultVo> postResultVoList) {
        bulkInsert(esIndexWantPost, esTypeWantPost, postResultVoList);
    }

    public void commentReindexRange(List<CommentResultVo> commentResultVoList) {
        bulkInsert(esIndexWantComment, esTypeWantComment, commentResultVoList);
    }

    public SearchCommentResultVo wantCommentSearch(HashMap<String, Object> queryMap) {
        StringBuilder entity = new StringBuilder();
        entity.append("{");
        entity.append(getWantCommentQueryPage(queryMap));
        entity.append("\"query\" : {\"bool\" : { \"must\" : [");
        entity.append(getWantCommentQuerySearch(queryMap));
        entity.append(getWantCommentQueryRange(queryMap));
        entity.append("]");
        entity.append(getWantCommentQueryFilter(queryMap));
        entity.append("}}");
        entity.append(getWantCommentQueryString(queryMap));
        entity.append(getWantCommentQuerySort(queryMap));
        entity.append(getWantCommentQueryAggs());
        entity.append("}");

        logger.info(entity.toString());

        HttpEntity httpEntity = new StringEntity(entity.toString(), "utf-8");
        HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory consumerFactory =
                new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024);

        SearchCommentResultVo searchCommentResultVo = new SearchCommentResultVo();

        try {
            Response response = lowClient.performRequest(
                    "GET", "/" + esIndexWantComment + "/_search", new HashMap<>(), httpEntity, consumerFactory);

            String responseBody = EntityUtils.toString(response.getEntity());
            JsonNode jsonNode = JsonHelper.toJsonNode(responseBody);
            logger.info(responseBody);
            searchCommentResultVo.setTotal(jsonNode.get("hits").get("total").asInt());
            searchCommentResultVo.setResultList(getSearchCommentResultVoList(jsonNode.get("hits").get("hits")));
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }

        return searchCommentResultVo;
    }

    public SearchPostResultVo wantPostSearch(HashMap<String, Object> queryMap) {
        StringBuilder entity = new StringBuilder();
        entity.append("{");
        entity.append(getWantPostQueryPage(queryMap));
        entity.append("\"query\" : {\"bool\" : { \"must\" : [");
        entity.append(getWantPostQuerySearch(queryMap));
        entity.append(getWantPostQueryFilterPhrase(queryMap));
        entity.append(getWantPostQueryRange(queryMap));
        entity.append("]");
        entity.append(getWantPostQueryFilter(queryMap));
        entity.append("}}");
        entity.append(getWantPostQueryString(queryMap));
        entity.append(getWantPostQuerySort(queryMap));
        entity.append(getWantPostQueryAggs());
        entity.append(getWantPostQuerySource(queryMap));
        entity.append("}");

        logger.info(entity.toString());

        HttpEntity httpEntity = new StringEntity(entity.toString(), "utf-8");
        HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory consumerFactory =
                new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024);

        SearchPostResultVo searchPostResultVo = new SearchPostResultVo();

        try {
            Response response = lowClient.performRequest(
                    "GET", "/" + esIndexWantPost + "/_search", new HashMap<>(), httpEntity, consumerFactory);

            String responseBody = EntityUtils.toString(response.getEntity());
            JsonNode jsonNode = JsonHelper.toJsonNode(responseBody);
            logger.info(responseBody);
            searchPostResultVo.setTotal(jsonNode.get("hits").get("total").asInt());
            searchPostResultVo.setResultList(getSearchPostResultVoList(jsonNode.get("hits").get("hits")));
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }

        return searchPostResultVo;
    }

    public SearchImResultVo imMessageSearch(HashMap<String, Object> queryMap) {
        //es <5.4.0 不支持高級查詢語法
//        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//        sourceBuilder.query(QueryBuilders.termQuery("targetType", "users"));
//        sourceBuilder.query(QueryBuilders.termQuery("messageType", "txt"));
//        sourceBuilder.from(0);
//        sourceBuilder.size(5);
//        sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
//
//        QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("targetType", "users")
//                .fuzziness(Fuzziness.AUTO)
//                .prefixLength(3)
//                .maxExpansions(10);
//
//        sourceBuilder.query(matchQueryBuilder);
//
//        SimpleQueryStringBuilder simpleQueryStringBuilder = new SimpleQueryStringBuilder("users").field("targetType");
//        sourceBuilder.query(simpleQueryStringBuilder);
//
//        QueryStringQueryBuilder queryStringQueryBuilder = new QueryStringQueryBuilder("users").field("targetType");
//        sourceBuilder.query(queryStringQueryBuilder);
//
//        QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
//        sourceBuilder.query(queryBuilder);
//
//        SearchRequest searchRequest = new SearchRequest(esIndexImMessage);
//        searchRequest.types(esTypeImMessage);
//        searchRequest.source(sourceBuilder);
//
//        try {
//
//            SearchResponse searchResponse = client.search(searchRequest);
//            SearchHits hits = searchResponse.getHits();
//
//            long totalHits = hits.getTotalHits();
//            float maxScore = hits.getMaxScore();
//
//            SearchHit[] searchHits = hits.getHits();
//            for (SearchHit hit : searchHits) {
//                String index = hit.getIndex();
//                String type = hit.getType();
//                String id = hit.getId();
//                float score = hit.getScore();
//
//                //取_source字段值
//                String sourceAsString = hit.getSourceAsString(); //取成json串
//                Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map对象
//                logger.info("totalHits:" + totalHits + " maxScore:" + maxScore + " index:" + index + "  type:" + type + "  id:" + id + " score:" + score);
//                logger.info(sourceAsString);
//            }
//
//        } catch (IOException e) {
//            e.printStackTrace();
//        }

        StringBuilder entity = new StringBuilder();
        entity.append("{");
        entity.append(getImMessageQueryPage(queryMap));
        entity.append("\"query\" : {\"bool\" : { \"must\" : [");
        entity.append(getImMessageQuerySearch(queryMap));
        entity.append(getImMessageQueryRange(queryMap));
        entity.append("]");
        entity.append(getImMessageQueryFilter(queryMap));
        entity.append("}}");
        entity.append(getImMessageQueryString(queryMap));
        entity.append(getImMessageQuerySort(queryMap));
        entity.append(getImMessageQueryAggs());
        entity.append("}");

        logger.info(entity.toString());

        //queryMap = Collections.singletonMap("pretty", "true");

        HttpEntity httpEntity = new StringEntity(entity.toString(), "utf-8");
        HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory consumerFactory =
                new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024);

        SearchImResultVo searchImResultVo = new SearchImResultVo();

        try {
            Response response = lowClient.performRequest(
                    "GET", "/" + esIndexImMessage + "/_search", new HashMap<>(), httpEntity, consumerFactory);

            String responseBody = EntityUtils.toString(response.getEntity());
            JsonNode jsonNode = JsonHelper.toJsonNode(responseBody);
            logger.info(responseBody);
            searchImResultVo.setTotal(jsonNode.get("hits").get("total").asInt());
            searchImResultVo.setResultList(getSearchImResultVoList(jsonNode.get("hits").get("hits")));
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }

        return searchImResultVo;
    }

    private List<ImResultVo> getSearchImResultVoList(JsonNode hitsNode) {
        List<ImResultVo> searchImResultVoList = new ArrayList<>();
        if (hitsNode.isArray()) {
            for (JsonNode hit : hitsNode) {
                searchImResultVoList.add(JsonHelper.toGenericObject(hit.get("_source").toString(), new TypeReference<ImResultVo>() {
                }));
            }
        }
        return searchImResultVoList;
    }

    private List<PostResultVo> getSearchPostResultVoList(JsonNode hitsNode) {
        List<PostResultVo> searchPostResultList = new ArrayList<>();
        if (hitsNode.isArray()) {
            for (JsonNode hit : hitsNode) {
                searchPostResultList.add(JsonHelper.toGenericObject(hit.get("_source").toString(), new TypeReference<PostResultVo>() {
                }));
            }
        }
        return searchPostResultList;
    }

    private List<CommentResultVo> getSearchCommentResultVoList(JsonNode hitsNode) {
        List<CommentResultVo> searchCommentResultList = new ArrayList<>();
        if (hitsNode.isArray()) {
            for (JsonNode hit : hitsNode) {
                searchCommentResultList.add(JsonHelper.toGenericObject(hit.get("_source").toString(), new TypeReference<CommentResultVo>() {
                }));
            }
        }
        return searchCommentResultList;
    }


//    public void update(String index, String type, String id, HashMap<String, Object> jsonMap) {
//        UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(jsonMap);
//        ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() {
//            @Override
//            public void onResponse(UpdateResponse updateResponse) {
//                //如果执行成功,则调用onResponse方法;
//                logger.info(String.format("index %s", updateResponse.getGetResult()));
//            }
//
//            @Override
//            public void onFailure(Exception e) {
//                //如果失败,则调用onFailure方法。
//                logger.error(e.getMessage());
//            }
//        };
//
//        client.updateAsync(updateRequest, listener);
//    }

    public <T> boolean bulkInsert(String index, String type, List<T> list) {
        boolean bulkInsertResult = false;

        BulkRequest request = new BulkRequest();
        request.timeout(TimeValue.timeValueMinutes(2));
        request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        request.waitForActiveShards(2);

        for (T t : list) {
            String indexField = t.getClass().getAnnotation(IndexAnnotation.class).field();
            try {
                request.add(new IndexRequest(index, type, t.getClass().getDeclaredField(indexField).get(t).toString()).source(JsonHelper.toJson(t), XContentType.JSON));
            } catch (NoSuchFieldException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            }
        }

        try {
            BulkResponse bulkResponse = client.bulk(request);
            if (!bulkResponse.hasFailures()) {
                bulkInsertResult = true;
            }
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }

        return bulkInsertResult;
    }

    public <T> void insertOrUpdate(String index, String type, T t) {
        try {
            String indexField = t.getClass().getAnnotation(IndexAnnotation.class).field();
            IndexRequest indexRequest = new IndexRequest(index, type, t.getClass().getDeclaredField(indexField).get(t).toString());
            indexRequest.source(JsonHelper.toJson(t), XContentType.JSON);
            ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
                @Override
                public void onResponse(IndexResponse indexResponse) {
                    //如果执行成功,则调用onResponse方法;
                    logger.info(String.format("%s %s %s", indexResponse.getIndex(), indexResponse.getId(), indexResponse.getResult()));
                }

                @Override
                public void onFailure(Exception e) {
                    //如果失败,则调用onFailure方法。
                    logger.error(e.getMessage());
                }
            };

            client.indexAsync(indexRequest, listener);
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
    }

    public void createImMessageIndex() {
        //先刪除再創建
        deleteIndex(esIndexImMessage);
        createIndex(esIndexImMessage, esTypeImMessage, createImMessageMapping());
    }

    public void createWantPostIndex() {
        //先刪除再創建
        deleteIndex(esIndexWantPost);
        createIndex(esIndexWantPost, esTypeWantPost, createWantPostMapping());
    }

    public void createWantCommentIndex() {
        //先刪除再創建
        deleteIndex(esIndexWantComment);
        createIndex(esIndexWantComment, esTypeWantComment, createWantCommentMapping());
    }

    public boolean deleteImMessageDocument(String id) {
        return deleteDocument(esIndexImMessage, esTypeImMessage, id);
    }

    public boolean deleteWantPostDocument(String id) {
        return deleteDocument(esIndexWantPost, esTypeWantPost, id);
    }


    public boolean deleteWantCommentDocument(String id) {
        return deleteDocument(esIndexWantComment, esTypeWantComment, id);
    }

    protected String createWantCommentMapping() {
        String json = "";
        try {
            XContentBuilder mapping = XContentFactory.jsonBuilder()
                    .startObject()
                    .startObject(esTypeWantComment)
                    .startObject("properties") //设置之定义字段

                    .startObject("commentId")//字段id
                    .field("type", "long")//设置数据类型
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("commentType")
                    .field("type", "text")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("commentChannel")
                    .field("type", "text")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("content")
                    .field("type", "text")
                    .field("index", "analyzed")
                    .endObject()

                    .startObject("deep")
                    .field("type", "byte")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("relateStoreId")
                    .field("type", "integer")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("relateCommonId")
                    .field("type", "integer")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("relatePostId")
                    .field("type", "long")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("replyCommentId")
                    .field("type", "long")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("createBy")
                    .field("type", "text")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("createTime")
                    .field("type", "date")
                    .field("index", "not_analyzed")
                    .field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis")
                    .endObject()

                    .startObject("commentState")
                    .field("type", "byte")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("commentLike")
                    .field("type", "byte")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("commentUnlike")
                    .field("type", "byte")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("commentFavor")
                    .field("type", "byte")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("commentReply")
                    .field("type", "byte")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("commentShare")
                    .field("type", "byte")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("images")
                    .field("type", "nested")
                    .endObject()

                    .endObject()
                    .endObject()
                    .endObject();

            json = mapping.string();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return json;
    }

    protected String createWantPostMapping() {
        String json = "";
        try {
            XContentBuilder mapping = XContentFactory.jsonBuilder()
                    .startObject()
                    .startObject(esTypeWantPost)
                    .startObject("properties") //设置之定义字段

                    .startObject("postId")//字段id
                    .field("type", "long")//设置数据类型
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("title")
                    .field("type", "text")
                    .field("index", "analyzed")
                    .endObject()

                    .startObject("content")
                    .field("type", "text")
                    .field("index", "analyzed")
                    .endObject()

                    .startObject("postCategory")
                    .field("type", "keyword")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("coverImage")
                    .field("type", "text")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("postType")
                    .field("type", "byte")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("postTag")
                    .field("type", "text")
                    .field("index", "analyzed")
                    .endObject()

                    .startObject("keyword")
                    .field("type", "text")
                    .field("index", "analyzed")
                    .endObject()

                    .startObject("createTime")
                    .field("type", "date")
                    .field("index", "not_analyzed")
                    .field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis")
                    .endObject()

                    .startObject("createBy")
                    .field("type", "text")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("isPublish")
                    .field("type", "byte")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("isDelete")
                    .field("type", "byte")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("expiresDate")
                    .field("type", "date")
                    .field("index", "not_analyzed")
                    .field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis")
                    .endObject()

                    .startObject("budgetPrice")
                    .field("type", "scaled_float")
                    .field("scaling_factor", "10")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("postView")
                    .field("type", "integer")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("postLike")
                    .field("type", "integer")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("postUnlike")
                    .field("type", "integer")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("postFavor")
                    .field("type", "integer")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("postReply")
                    .field("type", "integer")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("postShare")
                    .field("type", "integer")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("approveReason")
                    .field("type", "text")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("comments")
                    .field("type", "nested")
                    .endObject()

                    .startObject("images")
                    .field("type", "nested")
                    .endObject()

                    .endObject()
                    .endObject()
                    .endObject();

            json = mapping.string();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return json;
    }

    protected String createImMessageMapping() {
        String json = "";
        try {
            XContentBuilder mapping = XContentFactory.jsonBuilder()
                    .startObject()
                    .startObject(esTypeImMessage)
                    .startObject("properties") //设置之定义字段

                    .startObject("id")//字段id
                    .field("type", "long")//设置数据类型
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("sendFrom")
                    .field("type", "text")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("target")
                    .field("type", "text")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("targetType")
                    .field("type", "text")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("groupName")
                    .field("type", "text")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("messageContent")
                    .field("type", "text")
                    .field("index", "analyzed")
                    .endObject()

                    .startObject("messageType")
                    .field("type", "text")
                    .field("index", "not_analyzed")
                    .endObject()

                    .startObject("sendTime")
                    .field("type", "date")
                    .field("index", "not_analyzed")
                    .field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis")
                    .endObject()

                    .endObject()
                    .endObject()
                    .endObject();

            json = mapping.string();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return json;
    }

    protected void createIndex(String index, String type, String source) {
        CreateIndexRequest request = new CreateIndexRequest(index);//创建索引
        //创建的每个索引都可以有与之关联的特定设置。
        request.settings(Settings.builder()
                .put("index.number_of_shards", 5)
                .put("index.number_of_replicas", 1)
        );
        //创建索引时创建文档类型映射
        request.mapping(type,//类型定义
                source,//类型映射,需要的是一个JSON字符串
                XContentType.JSON);

        //可选参数
        request.timeout(TimeValue.timeValueMinutes(2));//超时,等待所有节点被确认(使用TimeValue方式)

        request.masterNodeTimeout(TimeValue.timeValueMinutes(1));//连接master节点的超时时间(使用TimeValue方式)

        request.waitForActiveShards(2);//在创建索引API返回响应之前等待的活动分片副本的数量,以int形式表示。

        ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {
            @Override
            public void onResponse(CreateIndexResponse createIndexResponse) {
                //如果执行成功,则调用onResponse方法;
                logger.info(String.format("create %s mapping %s", createIndexResponse.index(), createIndexResponse.isAcknowledged()));
            }

            @Override
            public void onFailure(Exception e) {
                //如果失败,则调用onFailure方法。
                logger.error(e.getMessage());
            }
        };
        client.indices().createAsync(request, listener);//要执行的CreateIndexRequest和执行完成时要使用的ActionListener
    }

    protected boolean deleteIndex(String index) {
        boolean acknowledged = false;
        DeleteIndexRequest request = new DeleteIndexRequest(index);//指定要删除的索引名称
        //可选参数:
        request.timeout(TimeValue.timeValueMinutes(2)); //设置超时,等待所有节点确认索引删除(使用TimeValue形式)

        request.masterNodeTimeout(TimeValue.timeValueMinutes(1));////连接master节点的超时时间(使用TimeValue方式)

        //设置IndicesOptions控制如何解决不可用的索引以及如何扩展通配符表达式
        request.indicesOptions(IndicesOptions.lenientExpandOpen());

        try {
            //同步执行
            DeleteIndexResponse deleteIndexResponse = client.indices().delete(request);
            //返回的DeleteIndexResponse允许检索有关执行的操作的信息,如下所示:
            acknowledged = deleteIndexResponse.isAcknowledged();//是否所有节点都已确认请求
        } catch (ElasticsearchException e) {
            if (e.status() == RestStatus.NOT_FOUND) {
                //如果没有找到要删除的索引,要执行某些操作
            }
        } catch (IOException e) {

        }

        return acknowledged;
    }

    protected boolean deleteDocument(String index, String type, String id) {
        boolean deleted = false;
        DeleteRequest deleteRequest = new DeleteRequest(index, type, id);
        //可选参数:
        deleteRequest.timeout(TimeValue.timeValueMinutes(2));
        deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);

        try {
            //同步执行
            DeleteResponse deleteResponse = client.delete(deleteRequest);
            if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {

            } else if (deleteResponse.getResult() == DocWriteResponse.Result.DELETED) {
                deleted = true;
            }
        } catch (ElasticsearchException e) {
            if (e.status() == RestStatus.CONFLICT) {

            }
        } catch (IOException e) {

        }

        return deleted;
    }

    private String getImMessageQueryPage(HashMap<String, Object> params) {
        int page = 0;
        int size = 0;

        if(params.containsKey("page")) {
            String paramPage = params.get("page").toString();
            if (!Strings.isEmpty(paramPage)) {
                page = Integer.valueOf(paramPage);
            }
        }

        if(params.containsKey("size")) {
            String paramSize = params.get("size").toString();
            if (!Strings.isEmpty(paramSize)) {
                size = Integer.valueOf(paramSize);
            }
        }

        if (page <= 0) {
            page = 1;
        }
        if (size <= 0) {
            size = 10;
        }

        int from = (page - 1) * size;
        return "\"from\":" + from + ",\"size\":" + size + ",";
    }

    private String getImMessageQuerySearch(HashMap<String, Object> params) {
        if (params.containsKey("messageContent")) {
            return "{\"match\": {\"messageContent\": \"" + params.get("messageContent") + "\"}}";
        }
        return "{\"match_all\":{}}";
    }

    private String getImMessageQueryRange(HashMap<String, Object> params) {
        String rangeString = "";
        if (params.containsKey("sendTimeFrom")) {
            rangeString += ",{\"range\": {\"sendTime\": {\"gte\": \"" + params.get("sendTimeFrom") + "\"}}}";
        }

        if (params.containsKey("sendTimeTo")) {
            rangeString += ",{\"range\": {\"sendTime\": {\"lte\": \"" + params.get("sendTimeTo") + "\"}}}";
        }

        if (params.containsKey("idFrom")) {
            rangeString += ",{\"range\": {\"id\": {\"gte\":" + params.get("idFrom") + "}}}";
        }

        if (params.containsKey("idTo")) {
            rangeString += ",{\"range\": {\"id\": {\"lte\":" + params.get("idTo") + "}}}";
        }

        return rangeString;
    }

    private String getImMessageQueryFilter(HashMap<String, Object> params) {
        String filterString = "";

        if (params.containsKey("id")) {
            filterString += ",{\"term\": { \"id\":" + params.get("id") + "}}";
        }

        if (params.containsKey("sendFrom")) {
            filterString += ",{\"term\": { \"sendFrom\":\"" + params.get("sendFrom") + "\"}}";
        }

        if (params.containsKey("groupName")) {
            filterString += ",{\"term\": { \"groupName\":\"" + params.get("groupName") + "\"}}";
        }

        if (params.containsKey("messageType")) {
            filterString += ",{\"term\": { \"messageType\":\"" + params.get("messageType") + "\"}}";
        }

        if (params.containsKey("target")) {
            filterString += ",{\"term\": { \"target\":\"" + params.get("target") + "\"}}";
        }

        if (params.containsKey("targetType")) {
            filterString += ",{\"term\": { \"targetType\":\"" + params.get("targetType") + "\"}}";
        }

        if (!filterString.equals("")) {
            filterString = ",\"filter\": [" + filterString.substring(1, filterString.length()) + "]";
        }

        return filterString;
    }

    private String getImMessageQuerySort(HashMap<String, Object> params) {
        if (params.containsKey("sort")) {
            return ",\"sort\": [{\"" + params.get("sort")
                    + "\":{\"order\": \"" + params.get("order")
                    + "\"}}, {\"id\": {\"order\": \"desc\"}}]";
        }
        return "";
    }

    private String getImMessageQueryAggs() {
        return ",\"aggs\": { }";
    }

    private String getImMessageQueryString(HashMap<String, Object> params) {
        if (params.containsKey("query")) {
            return ",\"query\": {\"query_string\": { \"query\":\"" + params.get("query") + "\"}}";
        }

        return "";
    }

    private String getWantPostQueryPage(HashMap<String, Object> params) {
        int page = 0;
        int size = 0;

        if(params.containsKey("page")) {
            String paramPage = params.get("page").toString();
            if (!Strings.isEmpty(paramPage)) {
                page = Integer.valueOf(paramPage);
            }
        }

        if(params.containsKey("size")) {
            String paramSize = params.get("size").toString();
            if (!Strings.isEmpty(paramSize)) {
                size = Integer.valueOf(paramSize);
            }
        }

        if (page <= 0) {
            page = 1;
        }
        if (size <= 0) {
            size = 10;
        }

        int from = (page - 1) * size;
        return "\"from\":" + from + ",\"size\":" + size + ",";
    }

    private String getWantPostQuerySearch(HashMap<String, Object> params) {
        if (params.containsKey("content")) {
            return "{\"match\": {\"content\": \"" + params.get("content") + "\"}}";
        }

        if (params.containsKey("title")) {
            return "{\"match\": {\"title\": \"" + params.get("title") + "\"}}";
        }

        return "{\"match_all\":{}}";
    }

    private String getWantPostQueryRange(HashMap<String, Object> params) {
        String rangeString = "";
        if (params.containsKey("createTimeFrom")) {
            rangeString += ",{\"range\": {\"createTime\": {\"gte\": \"" + params.get("createTimeFrom") + "\"}}}";
        }

        if (params.containsKey("createTimeTo")) {
            rangeString += ",{\"range\": {\"createTime\": {\"lte\": \"" + params.get("createTimeTo") + "\"}}}";
        }

        if (params.containsKey("expiresDateFrom")) {
            rangeString += ",{\"range\": {\"expiresDate\": {\"gte\": \"" + params.get("expiresDateFrom") + "\"}}}";
        }

        if (params.containsKey("expiresDateTo")) {
            rangeString += ",{\"range\": {\"expiresDate\": {\"lte\": \"" + params.get("expiresDateTo") + "\"}}}";
        }

        if (params.containsKey("budgetPriceFrom")) {
            rangeString += ",{\"range\": {\"budgetPrice\": {\"lte\": " + params.get("budgetPriceFrom") + "}}}";
        }

        if (params.containsKey("budgetPriceTo")) {
            rangeString += ",{\"range\": {\"budgetPrice\": {\"lte\": " + params.get("budgetPriceTo") + "}}}";
        }

        if (params.containsKey("postIdFrom")) {
            rangeString += ",{\"range\": {\"postId\": {\"gte\":" + params.get("postIdFrom") + "}}}";
        }

        if (params.containsKey("postIdTo")) {
            rangeString += ",{\"range\": {\"postId\": {\"lte\":" + params.get("postIdTo") + "}}}";
        }

        return rangeString;
    }

    private String getWantPostQueryFilterPhrase(HashMap<String, Object> params){
        String phraseString = "";
        if (params.containsKey("postTag")) {
            phraseString += ",{\"match_phrase\": { \"postTag\":\"" + params.get("postTag") + "\"}}";
        }

        if (params.containsKey("keyword")) {
            phraseString += ",{\"match_phrase\": { \"keyword\":\"" + params.get("keyword") + "\"}}";
        }

        return phraseString;
    }

    private String getWantPostQueryFilter(HashMap<String, Object> params) {
        String filterString = "";

        if (params.containsKey("postId")) {
            filterString += ",{\"term\": { \"postId\":" + params.get("postId") + "}}";
        }

        if (params.containsKey("postCategory")) {
            filterString += ",{\"term\": { \"postCategory\":\"" + params.get("postCategory") + "\"}}";
        }

        if (params.containsKey("postType")) {
            filterString += ",{\"term\": { \"postType\":\"" + params.get("postType") + "\"}}";
        }

        if (params.containsKey("createBy")) {
            filterString += ",{\"term\": { \"createBy\":\"" + params.get("createBy") + "\"}}";
        }

        if (params.containsKey("isPublish")) {
            filterString += ",{\"term\": { \"isPublish\":\"" + params.get("isPublish") + "\"}}";
        }

        if (params.containsKey("isDelete")) {
            filterString += ",{\"term\": { \"isDelete\":\"" + params.get("isDelete") + "\"}}";
        }

        if (!filterString.equals("")) {
            filterString = ",\"filter\": [" + filterString.substring(1, filterString.length()) + "]";
        }

        return filterString;
    }

    private String getWantPostQuerySort(HashMap<String, Object> params) {
        if (params.containsKey("sort")) {
            return ",\"sort\": [{\"" + params.get("sort")
                    + "\":{\"order\": \"" + params.get("order")
                    + "\"}}, {\"postId\": {\"order\": \"desc\"}}]";
        }
        return "";
    }

    private String getWantPostQueryAggs() {
        return ",\"aggs\": { }";
    }

    private String getWantPostQueryString(HashMap<String, Object> params) {
        if (params.containsKey("query")) {
            return ",\"query\": {\"query_string\": { \"query\":\"" + params.get("query") + "\"}}";
        }

        return "";
    }

    private String getWantPostQuerySource(HashMap<String, Object> params) {
        if (params.containsKey("source")) {
            return ",\"_source\": " + JsonHelper.toJson(params.get("source"));
        }
        return ",\"_source\": []";
    }


    private String getWantCommentQueryPage(HashMap<String, Object> params) {
        int page = 0;
        int size = 0;

        if(params.containsKey("page")) {
            String paramPage = params.get("page").toString();
            if (!Strings.isEmpty(paramPage)) {
                page = Integer.valueOf(paramPage);
            }
        }

        if(params.containsKey("size")) {
            String paramSize = params.get("size").toString();
            if (!Strings.isEmpty(paramSize)) {
                size = Integer.valueOf(paramSize);
            }
        }

        if (page <= 0) {
            page = 1;
        }
        if (size <= 0) {
            size = 10;
        }

        int from = (page - 1) * size;
        return "\"from\":" + from + ",\"size\":" + size + ",";
    }

    private String getWantCommentQuerySearch(HashMap<String, Object> params) {
        if (params.containsKey("content")) {
            return "{\"match\": {\"content\": \"" + params.get("content") + "\"}}";
        }
        return "{\"match_all\":{}}";
    }

    private String getWantCommentQueryRange(HashMap<String, Object> params) {
        String rangeString = "";
        if (params.containsKey("createTimeFrom")) {
            rangeString += ",{\"range\": {\"createTime\": {\"gte\": \"" + params.get("createTimeFrom") + "\"}}}";
        }

        if (params.containsKey("createTimeTo")) {
            rangeString += ",{\"range\": {\"createTime\": {\"lte\": \"" + params.get("createTimeTo") + "\"}}}";
        }

        if (params.containsKey("commentIdFrom")) {
            rangeString += ",{\"range\": {\"commentId\": {\"gte\":" + params.get("commentIdFrom") + "}}}";
        }

        if (params.containsKey("commentIdTo")) {
            rangeString += ",{\"range\": {\"commentId\": {\"lte\":" + params.get("commentIdTo") + "}}}";
        }

        return rangeString;
    }

    private String getWantCommentQueryFilter(HashMap<String, Object> params) {
        String filterString = "";

        if (params.containsKey("commentId")) {
            filterString += ",{\"term\": { \"commentId\":" + params.get("commentId") + "}}";
        }

        if (params.containsKey("commentType")) {
            filterString += ",{\"term\": { \"commentType\":" + params.get("commentType") + "}}";
        }

        if (params.containsKey("commentChannel")) {
            filterString += ",{\"term\": { \"commentChannel\":" + params.get("commentChannel") + "}}";
        }

        if (params.containsKey("deep")) {
            filterString += ",{\"term\": { \"deep\":" + params.get("deep") + "}}";
        }

        if (params.containsKey("relateStoreId")) {
            filterString += ",{\"term\": { \"relateStoreId\":" + params.get("relateStoreId") + "}}";
        }

        if (params.containsKey("relateCommonId")) {
            filterString += ",{\"term\": { \"relateCommonId\":" + params.get("relateCommonId") + "}}";
        }

        if (params.containsKey("relatePostId")) {
            filterString += ",{\"term\": { \"relatePostId\":" + params.get("relatePostId") + "}}";
        }

        if (params.containsKey("replyCommentId")) {
            filterString += ",{\"term\": { \"replyCommentId\":" + params.get("replyCommentId") + "}}";
        }

        if (params.containsKey("commentState")) {
            filterString += ",{\"term\": { \"commentState\":" + params.get("commentState") + "}}";
        }

        if (params.containsKey("createBy")) {
            filterString += ",{\"term\": { \"createBy\":\"" + params.get("createBy") + "\"}}";
        }

        if (!filterString.equals("")) {
            filterString = ",\"filter\": [" + filterString.substring(1, filterString.length()) + "]";
        }

        return filterString;
    }

    private String getWantCommentQuerySort(HashMap<String, Object> params) {
        if (params.containsKey("sort")) {
            return ",\"sort\": [{\"" + params.get("sort")
                    + "\":{\"order\": \"" + params.get("order")
                    + "\"}}, {\"commentId\": {\"order\": \"desc\"}}]";
        }
        return "";
    }

    private String getWantCommentQueryAggs() {
        return ",\"aggs\": { }";
    }

    private String getWantCommentQueryString(HashMap<String, Object> params) {
        if (params.containsKey("query")) {
            return ",\"query\": {\"query_string\": { \"query\":\"" + params.get("query") + "\"}}";
        }

        return "";
    }

    @PostConstruct
    protected void init() {
        client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost(esHost, esPort, esScheme)));

        lowClient = RestClient.builder(
                new HttpHost(esHost, esPort, esScheme)).build();
    }

    @PreDestroy
    protected void destroy() {
        if (client != null) {
            try {
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        if (lowClient != null) {
            try {
                lowClient.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
ESRestService
package com.ftofs.esclient.service;

import com.ftofs.esclient.dao.ImMessageLogRepository;
import com.ftofs.esclient.dao.WantCommentImageRepository;
import com.ftofs.esclient.dao.WantCommentRepository;
import com.ftofs.esclient.dao.WantPostImageRepository;
import com.ftofs.esclient.dao.WantPostRepository;
import com.ftofs.esclient.domain.ImMessageLog;
import com.ftofs.esclient.domain.WantComment;
import com.ftofs.esclient.domain.WantPost;
import com.ftofs.esclient.vo.CommentImageResultVo;
import com.ftofs.esclient.vo.CommentResultVo;
import com.ftofs.esclient.vo.ImResultVo;
import com.ftofs.esclient.vo.PostImageResultVo;
import com.ftofs.esclient.vo.PostResultVo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

@Component
public class ReindexService {
    @Autowired
    private WantPostRepository wantPostRepository;
    @Autowired
    private WantCommentRepository wantCommentRepository;
    @Autowired
    private WantPostImageRepository wantPostImageRepository;
    @Autowired
    private WantCommentImageRepository wantCommentImageRepository;
    @Autowired
    private ImMessageLogRepository imMessageLogRepository;

    private <T> String emptyToNull(T value) {
        if (value == null) {
            return null;
        } else {
            return String.valueOf(value);
        }
    }

    /**
     * 重置所有IM消息記錄
     * @return
     */
    public List<ImResultVo> getImMessageResultVoList() {
        SimpleDateFormat sdf =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        List<ImResultVo> imResultVoList = new ArrayList<>();
        List<ImMessageLog> imMessageLogList = imMessageLogRepository.findAll();
        imMessageLogList.forEach(x -> {
            ImResultVo imResultVo = new ImResultVo();
            imResultVo.setGroupName(x.getGroupName());
            imResultVo.setId(String.valueOf(x.getId()));
            imResultVo.setMessageContent(x.getMessageContent());
            imResultVo.setMessageType(x.getMessageType());
            imResultVo.setSendFrom(x.getSendFrom());
            imResultVo.setSendTime(sdf.format(x.getSendTime()));
            imResultVo.setTarget(x.getTarget());
            imResultVo.setTargetType(x.getTargetType());

            imResultVoList.add(imResultVo);
        });

        return imResultVoList;
    }
    /**
     * 重置所有評論記錄
     * @return
     */
    public List<CommentResultVo> getCommentResultVoList() {
        SimpleDateFormat sdf =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        List<CommentResultVo> commentResultVoList = new ArrayList<>();
        List<WantComment> wantCommentList = wantCommentRepository.findAll();
        wantCommentList.forEach(x -> {
            CommentResultVo commentResultVo = new CommentResultVo();
            commentResultVo.setReplyCommentId(emptyToNull(x.getReplyCommentId()));
            commentResultVo.setRelateStoreId(emptyToNull(x.getRelateStoreId()));
            commentResultVo.setRelatePostId(emptyToNull(x.getRelateStoreId()));
            commentResultVo.setRelateCommonId(emptyToNull(x.getRelateCommonId()));
            commentResultVo.setDeep(emptyToNull(x.getDeep()));
            commentResultVo.setCommentUnlike(emptyToNull(x.getCommentUnlike()));
            commentResultVo.setCommentType(emptyToNull(x.getCommentType()));
            commentResultVo.setCommentState(emptyToNull(x.getCommentState()));
            commentResultVo.setCommentShare(emptyToNull(x.getCommentShare()));
            commentResultVo.setCommentReply(emptyToNull(x.getCommentReply()));
            commentResultVo.setCommentLike(emptyToNull(x.getCommentLike()));
            commentResultVo.setCommentId(emptyToNull(x.getCommentId()));
            commentResultVo.setCommentFavor(emptyToNull(x.getCommentFavor()));
            commentResultVo.setCommentChannel(emptyToNull(x.getCommentChannel()));
            commentResultVo.setCreateTime(sdf.format(x.getCreateTime()));
            commentResultVo.setCreateBy(x.getCreateBy());
            commentResultVo.setContent(x.getContent());

            List<CommentImageResultVo> commentImages = new ArrayList<>();
            List<String> imageUrlList = wantCommentImageRepository.findWantCommentImagesByCommentId(x.getCommentId());
            imageUrlList.forEach(url -> {
                CommentImageResultVo commentImageResultVo = new CommentImageResultVo();
                commentImageResultVo.setImageUrl(url);
                commentImages.add(commentImageResultVo);
            });
            commentResultVo.setImages(commentImages);

            commentResultVoList.add(commentResultVo);
        });

        return commentResultVoList;
    }

    /**
     * 重置所有貼文記錄
     * @return
     */
    public List<PostResultVo> getPostResultVoList() {
        SimpleDateFormat sdfDateTime =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        SimpleDateFormat sdfDate =  new SimpleDateFormat("yyyy-MM-dd");
        List<PostResultVo> postResultVoList = new ArrayList<>();
        List<WantPost> wantPostList = wantPostRepository.findAll();
        wantPostList.forEach(x -> {
            PostResultVo postResultVo = new PostResultVo();
            postResultVo.setBudgetPrice(emptyToNull(x.getBudgetPrice()));
            postResultVo.setContent(x.getContent());
            postResultVo.setCreateBy(x.getCreateBy());
            postResultVo.setCreateTime(sdfDateTime.format(x.getCreateTime()));
            postResultVo.setExpiresDate(sdfDate.format(x.getExpiresDate()));
            postResultVo.setIsDelete(emptyToNull(x.getIsDelete()));
            postResultVo.setIsPublish(emptyToNull(x.getIsPublish()));
            postResultVo.setKeyword(x.getKeyword());
            postResultVo.setPostCategory(x.getPostCategory());
            postResultVo.setPostId(emptyToNull(x.getPostId()));
            postResultVo.setPostTag(x.getPostTag());
            postResultVo.setPostType(emptyToNull(x.getPostType()));
            postResultVo.setPostFavor(emptyToNull(x.getPostFavor()));
            postResultVo.setPostLike(emptyToNull(x.getPostLike()));
            postResultVo.setPostUnlike(emptyToNull(x.getPostUnlike()));
            postResultVo.setPostShare(emptyToNull(x.getPostShare()));
            postResultVo.setPostReply(emptyToNull(x.getPostReply()));
            postResultVo.setPostView(emptyToNull(x.getPostView()));
            postResultVo.setTitle(x.getTitle());
            postResultVo.setCoverImage(x.getCoverImage());
            postResultVo.setApproveReason(x.getApproveReason());

            List<CommentResultVo> comments = new ArrayList<>();
            List<WantComment> wantCommentList = wantCommentRepository.findWantCommentsByPostId(x.getPostId());
            wantCommentList.forEach(y -> {
                CommentResultVo commentResultVo = new CommentResultVo();
                commentResultVo.setContent(y.getContent());
                commentResultVo.setCreateBy(y.getCreateBy());
                commentResultVo.setCreateTime(sdfDateTime.format(y.getCreateTime()));
                commentResultVo.setCommentChannel(emptyToNull(y.getCommentChannel()));
                commentResultVo.setCommentFavor(emptyToNull(y.getCommentFavor()));
                commentResultVo.setCommentId(emptyToNull(y.getCommentId()));
                commentResultVo.setCommentLike(emptyToNull(y.getCommentLike()));
                commentResultVo.setCommentReply(emptyToNull(y.getCommentReply()));
                commentResultVo.setCommentShare(emptyToNull(y.getCommentShare()));
                commentResultVo.setCommentState(emptyToNull(y.getCommentState()));
                commentResultVo.setCommentType(emptyToNull(y.getCommentType()));
                commentResultVo.setCommentUnlike(emptyToNull(y.getCommentUnlike()));
                commentResultVo.setDeep(emptyToNull(y.getDeep()));
                commentResultVo.setRelateCommonId(emptyToNull(y.getRelateCommonId()));
                commentResultVo.setRelatePostId(emptyToNull(y.getRelatePostId()));
                commentResultVo.setRelateStoreId(emptyToNull(y.getRelateStoreId()));
                commentResultVo.setReplyCommentId(emptyToNull(y.getReplyCommentId()));
                List<CommentImageResultVo> commentImages = new ArrayList<>();
                List<String> imageUrlList = wantCommentImageRepository.findWantCommentImagesByCommentId(y.getCommentId());
                imageUrlList.forEach(url -> {
                    CommentImageResultVo commentImageResultVo = new CommentImageResultVo();
                    commentImageResultVo.setImageUrl(url);
                    commentImages.add(commentImageResultVo);
                });
                commentResultVo.setImages(commentImages);
                comments.add(commentResultVo);
            });
            postResultVo.setComments(comments);

            List<PostImageResultVo> postImages = new ArrayList<>();
            List<String> imageUrlList = wantPostImageRepository.findWantPostImagesByPostId(x.getPostId());
            imageUrlList.forEach(url -> {
                PostImageResultVo postImageResultVo = new PostImageResultVo();
                postImageResultVo.setImageUrl(url);
                postImages.add(postImageResultVo);
            });
            postResultVo.setImages(postImages);

            postResultVoList.add(postResultVo);
        });

        return postResultVoList;
    }
}
ReindexService
package com.ftofs.esclient.common;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

public class ESAnnotation {
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface IndexAnnotation {
        String field();
    }
}
ESAnnotation
package com.ftofs.esclient.common;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.TimeZone;

/**
 * JSON帮助类
 */
public class JsonHelper {
    private static final Logger log = LoggerFactory.getLogger(JsonHelper.class);

    final static ObjectMapper objectMapper;

    static {
        objectMapper = new ObjectMapper();
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
        objectMapper.setTimeZone(TimeZone.getTimeZone("GMT+8"));
    }

    public static ObjectMapper getObjectMapper() {
        return objectMapper;
    }

    /**
     * JSON串转换为Java泛型对象
     * @param <T>
     * @param jsonString JSON字符串
     * @param tr         TypeReference,例如: new TypeReference< List<FamousUser> >(){}
     * @return List对象列表
     */
    public static <T> T toGenericObject(String jsonString, TypeReference<T> tr) {

        if (jsonString == null || "".equals(jsonString)) {
            return null;
        } else {
            try {
                return (T) objectMapper.readValue(jsonString, tr);
            } catch (Exception e) {
                log.warn(jsonString);
                log.warn("json error:" + e.getMessage());
            }
        }
        return null;
    }

    /**
     * Json字符串转Java对象
     * @param jsonString
     * @param c
     * @return
     */
    public static Object toObject(String jsonString, Class<?> c) {

        if (jsonString == null || "".equals(jsonString)) {
            return "";
        } else {
            try {
                return objectMapper.readValue(jsonString, c);
            } catch (Exception e) {
                log.warn("json error:" + e.getMessage());
            }
        }
        return null;
    }

    /**
     * Java对象转Json字符串
     * @param object Java对象,可以是对象,数组,List,Map等
     * @return json 字符串
     */
    public static String toJson(Object object) {
        String jsonString = "";
        try {
            jsonString = objectMapper.writeValueAsString(object);
        } catch (Exception e) {
            log.warn("json error:" + e.getMessage());
        }
        return jsonString;
    }

    /**
     * Json字符串转JsonNode
     * @param jsonString
     * @return
     */
    public static JsonNode toJsonNode(String jsonString) {
        try {
            return objectMapper.readTree(jsonString);
        } catch (IOException e) {
            e.printStackTrace();
            log.warn("json error:" + e.getMessage());
        }
        return null;
    }

}
JsonHelper
package com.ftofs.esclient.controller;

import com.ftofs.esclient.ESRestService;
import com.ftofs.esclient.service.ReindexService;
import com.ftofs.esclient.vo.CommentResultVo;
import com.ftofs.esclient.vo.ImResultVo;
import com.ftofs.esclient.vo.PostResultVo;
import com.ftofs.esclient.vo.SearchCommentResultVo;
import com.ftofs.esclient.vo.SearchImResultVo;
import com.ftofs.esclient.vo.SearchPostResultVo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.List;

@RestController
@RequestMapping("")
@Api(tags="ESRestController")
public class ESRestController {
    @Autowired
    private ESRestService restService;

    @Autowired
    private ReindexService reindexService;

    @Value("${esIndexImMessage}")
    private String esIndexImMessage;

    @Value("${esTypeImMessage}")
    private String esTypeImMessage;

    @Value("${esIndexWantPost}")
    private String esIndexWantPost;

    @Value("${esTypeWantPost}")
    private String esTypeWantPost;

    @Value("${esIndexWantComment}")
    private String esIndexWantComment;

    @Value("${esTypeWantComment}")
    private String esTypeWantComment;

    @PostMapping("/im/init")
    @ApiOperation(value = "IM消息索引初始化")
    public void imMessageInit() {
        restService.createImMessageIndex();
    }

    @PostMapping("/im/delete/{id}")
    @ApiOperation(value = "刪除IM消息")
    public void deleteImMessageDocument(@PathVariable String id) {
        restService.deleteImMessageDocument(id);
    }

    @PostMapping("/post/init")
    @ApiOperation(value = "貼文索引初始化")
    public void wantPostInit() {
        restService.createWantPostIndex();
    }

    @PostMapping("/post/delete/{id}")
    @ApiOperation(value = "刪除貼文")
    public void deleteWantPostDocument(@PathVariable String id) {
        restService.deleteWantPostDocument(id);
    }

    @PostMapping("/comment/init")
    @ApiOperation(value = "評論索引初始化")
    public void wantCommentInit() {
        restService.createWantCommentIndex();
    }

    @PostMapping("/comment/delete/{id}")
    @ApiOperation(value = "刪除評論")
    public void deleteWantCommentDocument(@PathVariable String id) {
        restService.deleteWantCommentDocument(id);
    }

    @PostMapping("/im/reindexRange")
    @ApiOperation(value = "批量添加IM消息記錄")
    public void imReindexRange(@RequestBody List<ImResultVo> imResultVoList) {
        restService.imReindexRange(imResultVoList);
    }

    @PostMapping("/post/reindexRange")
    @ApiOperation(value = "批量添加貼文")
    public void postReindexRange(@RequestBody List<PostResultVo> postResultVoList) {
        restService.postReindexRange(postResultVoList);
    }

    @PostMapping("/comment/reindexRange")
    @ApiOperation(value = "批量添加評論")
    public void commentReindexRange(@RequestBody List<CommentResultVo> commentResultVoList) {
        restService.commentReindexRange(commentResultVoList);
    }

    @PostMapping("/im/insertOrUpdate")
    @ApiOperation(value = "添加IM消息")
    public void imMessageInsertOrUpdate(@RequestBody ImResultVo imResultVo) {
        restService.insertOrUpdate(esIndexImMessage, esTypeImMessage, imResultVo);
    }

    @PostMapping("/post/insertOrUpdate")
    @ApiOperation(value = "添加貼文")
    public void wantPostInsertOrUpdate(@RequestBody PostResultVo postResultVo) {
        restService.insertOrUpdate(esIndexWantPost, esTypeWantPost, postResultVo);
    }

    @PostMapping("/comment/insertOrUpdate")
    @ApiOperation(value = "添加評論")
    public void wantCommentInsertOrUpdate(@RequestBody CommentResultVo commentResultVo) {
        restService.insertOrUpdate(esIndexWantComment, esTypeWantComment, commentResultVo);
    }

    @GetMapping("/im/search")
    @ApiOperation(value = "IM消息搜索")
    public SearchImResultVo imMessageSearch(@RequestBody HashMap<String, Object> queryMap) {
        return restService.imMessageSearch(queryMap);
    }

    @GetMapping("/post/search")
    @ApiOperation(value = "貼文搜索")
    public SearchPostResultVo wantPostSearch(@RequestBody HashMap<String, Object> queryMap) {
        return restService.wantPostSearch(queryMap);
    }

    @GetMapping("/comment/search")
    @ApiOperation(value = "評論搜索")
    public SearchCommentResultVo wantCommentSearch(@RequestBody HashMap<String, Object> queryMap) {
        return restService.wantCommentSearch(queryMap);
    }

    @PostMapping("/im/reindexAll")
    @ApiOperation(value = "重置所有IM消息記錄")
    public void imReindexAll() {
        restService.imReindexRange(reindexService.getImMessageResultVoList());
    }

    @PostMapping("/post/reindexAll")
    @ApiOperation(value = "重置所有貼文記錄")
    public void postReindexAll() {
        restService.postReindexRange(reindexService.getPostResultVoList());
    }

    @PostMapping("/comment/reindexAll")
    @ApiOperation(value = "重置所有評論記錄")
    public void commentReindexAll() {
        restService.commentReindexRange(reindexService.getCommentResultVoList());
    }
}
ESRestController
package com.ftofs.esclient;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@EnableSwagger2
@Configuration
public class Swagger2 {
    @Bean
    public Docket createRestApi() {
        return new Docket(DocumentationType.SWAGGER_2)
                .apiInfo(apiInfo())
                .select()
                .apis(RequestHandlerSelectors.basePackage("com.ftofs.esclient.controller"))
                .paths(PathSelectors.any())
                .build();
    }

    private ApiInfo apiInfo() {
        return new ApiInfoBuilder()
                .title("Elasticsearch Rest High/Low Level Client 接口文檔說明")
                .contact(new Contact("Nick.Chung", "http://cnblogs.com/ncore", "46638441@qq.com"))
                .description("简单优雅的Restful风格")
                .termsOfServiceUrl("https://github.com/mbhybird")
                .version("1.0")
                .build();
    }
}
Swagger2
package com.ftofs.esclient;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ESClientApplication {
    public static void main(String[] args) {
        SpringApplication.run(ESClientApplication.class, args);
    }


}
ESClientApplication
server.port=9500
esHost = 127.0.0.1
#dev:192.168.5.30
#train:192.168.5.32
esPort = 9200
esScheme = http
esIndexImMessage = index_im_msg
esIndexWantPost = index_want_post
esIndexWantComment = index_want_comment
esTypeImMessage = im_msg
esTypeWantPost = want_post
esTypeWantComment = want_comment

spring.datasource.url = jdbc:mysql://192.168.5.30:3306/twdb?characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=123456

spring.jpa.database=mysql
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=none
spring.jpa.hibernate.naming-strategy = org.hibernate.cfg.ImprovedNamingStrategy
spring.jpa.properties.hibernate.dialect= org.hibernate.dialect.MySQL5Dialect

#logging.level.root=WARN
##logging.level.org.springframework.web=INFO
##logging.file=/log/es.log
##logging.pattern.console=%d{yyyy/MM/dd-HH:mm:ss} [%thread] %-5level %logger- %msg%n
##logging.pattern.file=%d{yyyy/MM/dd-HH:mm} [%thread] %-5level %logger- %msg%n
application.properties
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ftofs</groupId>
    <artifactId>esclient</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>esclient</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.2.4</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>6.2.4</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.2.4</version>
        </dependency>

        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.8.0</version>
        </dependency>

        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.10</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals><goal>shade</goal></goals>
                        <configuration>
                            <relocations>
                                <relocation>
                                    <pattern>org.apache.http</pattern>
                                    <shadedPattern>hidden.org.apache.http</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.logging</pattern>
                                    <shadedPattern>hidden.org.apache.logging</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.commons.codec</pattern>
                                    <shadedPattern>hidden.org.apache.commons.codec</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.apache.commons.logging</pattern>
                                    <shadedPattern>hidden.org.apache.commons.logging</shadedPattern>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
pom.xml

Elasticsearch Java client(ES Client 简介、Java REST Client、Java Client、Spring Data Elasticsearch)

Elasticsearch Java client(ES Client 简介、Java REST Client、Java Client、Spring Data Elasticsearch)

elasticsearch系列七:ES Java客户端-Elasticsearch Java client(ES Client 简介、Java REST Client、Java Client、Spring Data Elasticsearch)

一、ES Client 简介

1. ES是一个服务,采用C/S结构

 

2. 回顾 ES的架构

 

3. ES支持的客户端连接方式

3.1 REST API ,端口 9200

  这种连接方式对应于架构图中的RESTful style API这一层,这种客户端的连接方式是RESTful风格的,使用http的方式进行连接

3.2 Transport 连接 端口 9300

      这种连接方式对应于架构图中的Transport这一层,这种客户端连接方式是直接连接ES的节点,使用TCP的方式进行连接

4. ES提供了多种编程语言客户端

 

官网可以了解详情:

https://www.elastic.co/guide/en/elasticsearch/client/index.html

二、Java REST Client介绍

1. ES提供了两个JAVA REST client 版本

Java Low Level REST Client: 低级别的REST客户端,通过http与集群交互,用户需自己编组请求JSON串,及解析响应JSON串。兼容所有ES版本
Java High Level REST Client: 高级别的REST客户端,基于低级别的REST客户端,增加了编组请求JSON串、解析响应JSON串等相关api。使用的版本需要保持和ES服务端的版本一致,否则会有版本问题。

2. Java Low Level REST Client 说明

特点,maven 引入、使用介绍: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.html

API doc :https://artifacts.elastic.co/javadoc/org/elasticsearch/client/elasticsearch-rest-client/6.2.4/index.html.

3. Java High Level REST Client 说明

从6.0.0开始加入的,目的是以java面向对象的方式来进行请求、响应处理。
每个API 支持 同步/异步 两种方式,同步方法直接返回一个结果对象。异步的方法以async为后缀,通过listener参数来通知结果
高级java REST 客户端依赖Elasticsearch core project

兼容性说明:

依赖 java1.8 和 Elasticsearch core project
请使用与服务端ES版本一致的客户端版本

4. Java High Level REST Client  maven 集成

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.2.4</version>
</dependency>

5. Java High Level REST Client  初始化

RestHighLevelClient client = new RestHighLevelClient(
        RestClient.builder(
                new HttpHost("localhost", 9200, "http"),
                new HttpHost("localhost", 9201, "http")));

给定集群的多个节点地址,将客户端负载均衡地向这个节点地址集发请求

Client 不再使用了,记得关闭它:

client.close();

 API及用法示例,请参考:

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-supported-apis.html

三、Java High Level REST Client  使用示例

准备:

编写示例之前首先在maven工程里面引入和ES服务端版本一样的Java客户端

<dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>6.2.4</version>
     </dependency>

给定集群的多个节点地址,将客户端负载均衡地向这个节点地址集发请求:

InitDemo.java

复制代码
package com.study.es_hrset_client;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;

/**
 * 
 * @Description: 获取Java High Level REST Client客户端
 * @author lgs
 * @date 2018年6月23日
 *
 */
public class InitDemo {

    public static RestHighLevelClient getClient() {

        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(new HttpHost("localhost", 9200, "http"),
                        new HttpHost("localhost", 9201, "http")));

        return client;
    }
}
复制代码

注意使用ES的客户端时类比之前我们在Kibana进行的ES的相关操作,这样使用起来更加有效果

1. Create index 创建索引

CreateIndexDemo.java

复制代码
package com.study.es_hrset_client;

import java.io.IOException;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;

/**
 * 
 * @Description: 创建索引
 * @author lgs
 * @date 2018年6月23日
 *
 */
public class CreateIndexDemo {

    public static void main(String[] args) {
        try (RestHighLevelClient client = InitDemo.getClient();) {

            // 1、创建 创建索引request 参数:索引名mess
            CreateIndexRequest request = new CreateIndexRequest("mess");

            // 2、设置索引的settings
            request.settings(Settings.builder().put("index.number_of_shards", 3) // 分片数
                    .put("index.number_of_replicas", 2) // 副本数
                    .put("analysis.analyzer.default.tokenizer", "ik_smart") // 默认分词器
            );

            // 3、设置索引的mappings
            request.mapping("_doc",
                    "  {\n" +
                    "    \"_doc\": {\n" +
                    "      \"properties\": {\n" +
                    "        \"message\": {\n" +
                    "          \"type\": \"text\"\n" +
                    "        }\n" +
                    "      }\n" +
                    "    }\n" +
                    "  }",
                    XContentType.JSON);

            // 4、 设置索引的别名
            request.alias(new Alias("mmm"));

            // 5、 发送请求
            // 5.1 同步方式发送请求
            CreateIndexResponse createIndexResponse = client.indices()
                    .create(request);

            // 6、处理响应
            boolean acknowledged = createIndexResponse.isAcknowledged();
            boolean shardsAcknowledged = createIndexResponse
                    .isShardsAcknowledged();
            System.out.println("acknowledged = " + acknowledged);
            System.out.println("shardsAcknowledged = " + shardsAcknowledged);

            // 5.1 异步方式发送请求
            /*ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {
                @Override
                public void onResponse(
                        CreateIndexResponse createIndexResponse) {
                    // 6、处理响应
                    boolean acknowledged = createIndexResponse.isAcknowledged();
                    boolean shardsAcknowledged = createIndexResponse
                            .isShardsAcknowledged();
                    System.out.println("acknowledged = " + acknowledged);
                    System.out.println(
                            "shardsAcknowledged = " + shardsAcknowledged);
                }

                @Override
                public void onFailure(Exception e) {
                    System.out.println("创建索引异常:" + e.getMessage());
                }
            };

            client.indices().createAsync(request, listener);
            */
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
复制代码

 运行结果:

acknowledged = true
shardsAcknowledged = true

2. index  document

 索引文档,即往索引里面放入文档数据.类似于数据库里面向表里面插入一行数据,一行数据就是一个文档

 IndexDocumentDemo.java

复制代码
package com.study.es_hrset_client;

import java.io.IOException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;

/**
 * 
 * @Description: 索引文档,即往索引里面放入文档数据.类似于数据库里面向表里面插入一行数据,一行数据就是一个文档
 * @author lgs
 * @date 2018年6月23日
 *
 */
public class IndexDocumentDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        try (RestHighLevelClient client = InitDemo.getClient();) {
            // 1、创建索引请求
            IndexRequest request = new IndexRequest(
                    "mess",   //索引
                    "_doc",     // mapping type
                    "1");     //文档id  
            
            // 2、准备文档数据
            // 方式一:直接给JSON串
            String jsonString = "{" +
                    "\"user\":\"kimchy\"," +
                    "\"postDate\":\"2013-01-30\"," +
                    "\"message\":\"trying out Elasticsearch\"" +
                    "}";
            request.source(jsonString, XContentType.JSON); 
            
            // 方式二:以map对象来表示文档
            /*
            Map<String, Object> jsonMap = new HashMap<>();
            jsonMap.put("user", "kimchy");
            jsonMap.put("postDate", new Date());
            jsonMap.put("message", "trying out Elasticsearch");
            request.source(jsonMap); 
            */
            
            // 方式三:用XContentBuilder来构建文档
            /*
            XContentBuilder builder = XContentFactory.jsonBuilder();
            builder.startObject();
            {
                builder.field("user", "kimchy");
                builder.field("postDate", new Date());
                builder.field("message", "trying out Elasticsearch");
            }
            builder.endObject();
            request.source(builder); 
            */
            
            // 方式四:直接用key-value对给出
            /*
            request.source("user", "kimchy",
                            "postDate", new Date(),
                            "message", "trying out Elasticsearch");
            */
            
            //3、其他的一些可选设置
            /*
            request.routing("routing");  //设置routing值
            request.timeout(TimeValue.timeValueSeconds(1));  //设置主分片等待时长
            request.setRefreshPolicy("wait_for");  //设置重刷新策略
            request.version(2);  //设置版本号
            request.opType(DocWriteRequest.OpType.CREATE);  //操作类别  
            */
            
            //4、发送请求
            IndexResponse indexResponse = null;
            try {
                // 同步方式
                indexResponse = client.index(request);            
            } catch(ElasticsearchException e) {
                // 捕获,并处理异常
                //判断是否版本冲突、create但文档已存在冲突
                if (e.status() == RestStatus.CONFLICT) {
                    logger.error("冲突了,请在此写冲突处理逻辑!\n" + e.getDetailedMessage());
                }
                
                logger.error("索引异常", e);
            }
            
            //5、处理响应
            if(indexResponse != null) {
                String index = indexResponse.getIndex();
                String type = indexResponse.getType();
                String id = indexResponse.getId();
                long version = indexResponse.getVersion();
                if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                    System.out.println("新增文档成功,处理逻辑代码写到这里。");
                } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                    System.out.println("修改文档成功,处理逻辑代码写到这里。");
                }
                // 分片处理信息
                ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
                if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
                    
                }
                // 如果有分片副本失败,可以获得失败原因信息
                if (shardInfo.getFailed() > 0) {
                    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                        String reason = failure.reason(); 
                        System.out.println("副本失败原因:" + reason);
                    }
                }
            }
            
            
            //异步方式发送索引请求
            /*ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
                @Override
                public void onResponse(IndexResponse indexResponse) {
                    
                }

                @Override
                public void onFailure(Exception e) {
                    
                }
            };
            client.indexAsync(request, listener);
            */
            
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
复制代码

 运行结果:

新增文档成功,处理逻辑代码写到这里。

3. get  document

 获取文档数据

 GetDocumentDemo.java

复制代码
package com.study.es_hrset_client;

import java.io.IOException;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

/**
 * 
 * @Description: 获取文档数据
 * @author lgs
 * @date 2018年6月23日
 *
 */
public class GetDocumentDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        try (RestHighLevelClient client = InitDemo.getClient();) {
            // 1、创建获取文档请求
            GetRequest request = new GetRequest(
                    "mess",   //索引
                    "_doc",     // mapping type
                    "1");     //文档id  
            
            // 2、可选的设置
            //request.routing("routing");
            //request.version(2);
            
            //request.fetchSourceContext(new FetchSourceContext(false)); //是否获取_source字段
            //选择返回的字段
            String[] includes = new String[]{"message", "*Date"};
            String[] excludes = Strings.EMPTY_ARRAY;
            FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
            request.fetchSourceContext(fetchSourceContext); 
            
            //也可写成这样
            /*String[] includes = Strings.EMPTY_ARRAY;
            String[] excludes = new String[]{"message"};
            FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
            request.fetchSourceContext(fetchSourceContext);*/
            
            
            // 取stored字段
            /*request.storedFields("message"); 
            GetResponse getResponse = client.get(request);
            String message = getResponse.getField("message").getValue();*/
            
            
            //3、发送请求        
            GetResponse getResponse = null;
            try {
                // 同步请求
                getResponse = client.get(request);
            } catch (ElasticsearchException e) {
                if (e.status() == RestStatus.NOT_FOUND) {
                    logger.error("没有找到该id的文档" );
                }
                if (e.status() == RestStatus.CONFLICT) {
                    logger.error("获取时版本冲突了,请在此写冲突处理逻辑!" );
                }
                logger.error("获取文档异常", e);
            }
            
            //4、处理响应
            if(getResponse != null) {
                String index = getResponse.getIndex();
                String type = getResponse.getType();
                String id = getResponse.getId();
                if (getResponse.isExists()) { // 文档存在
                    long version = getResponse.getVersion();
                    String sourceAsString = getResponse.getSourceAsString(); //结果取成 String       
                    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();  // 结果取成Map
                    byte[] sourceAsBytes = getResponse.getSourceAsBytes();    //结果取成字节数组
                    
                    logger.info("index:" + index + "  type:" + type + "  id:" + id);
                    logger.info(sourceAsString);
                    
                } else {
                    logger.error("没有找到该id的文档" );
                }
            }
            
            
            //异步方式发送获取文档请求
            /*
            ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
                @Override
                public void onResponse(GetResponse getResponse) {
                    
                }
            
                @Override
                public void onFailure(Exception e) {
                    
                }
            };
            client.getAsync(request, listener);
            */
            
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
复制代码

 4. Bulk 

 批量索引文档,即批量往索引里面放入文档数据.类似于数据库里面批量向表里面插入多行数据,一行数据就是一个文档

 BulkDemo.java

复制代码
package com.study.es_hrset_client;

import java.io.IOException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;

/**
 * 
 * @Description: 批量索引文档,即批量往索引里面放入文档数据.类似于数据库里面批量向表里面插入多行数据,一行数据就是一个文档
 * @author lgs
 * @date 2018年6月23日
 *
 */
public class BulkDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        try (RestHighLevelClient client = InitDemo.getClient();) {
            
            // 1、创建批量操作请求
            BulkRequest request = new BulkRequest(); 
            request.add(new IndexRequest("mess", "_doc", "1")  
                    .source(XContentType.JSON,"field", "foo"));
            request.add(new IndexRequest("mess", "_doc", "2")  
                    .source(XContentType.JSON,"field", "bar"));
            request.add(new IndexRequest("mess", "_doc", "3")  
                    .source(XContentType.JSON,"field", "baz"));
            
            /*
            request.add(new DeleteRequest("mess", "_doc", "3")); 
            request.add(new UpdateRequest("mess", "_doc", "2") 
                    .doc(XContentType.JSON,"other", "test"));
            request.add(new IndexRequest("mess", "_doc", "4")  
                    .source(XContentType.JSON,"field", "baz"));
            */
            
            // 2、可选的设置
            /*
            request.timeout("2m");
            request.setRefreshPolicy("wait_for");  
            request.waitForActiveShards(2);
            */
            
            
            //3、发送请求        
        
            // 同步请求
            BulkResponse bulkResponse = client.bulk(request);
            
            
            //4、处理响应
            if(bulkResponse != null) {
                for (BulkItemResponse bulkItemResponse : bulkResponse) { 
                    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

                    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
                            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { 
                        IndexResponse indexResponse = (IndexResponse) itemResponse;
                        //TODO 新增成功的处理

                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { 
                        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                       //TODO 修改成功的处理

                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
                        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                        //TODO 删除成功的处理
                    }
                }
            }
            
            
            //异步方式发送批量操作请求
            /*
            ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse bulkResponse) {
                    
                }
            
                @Override
                public void onFailure(Exception e) {
                    
                }
            };
            client.bulkAsync(request, listener);
            */
            
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
复制代码

 5. search

 搜索数据

 SearchDemo.java

复制代码
package com.study.es_hrset_client;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.term.TermSuggestion;

/**
 * 
 * @Description: 搜索数据
 * @author lgs
 * @date 2018年6月23日
 *
 */
public class SearchDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        try (RestHighLevelClient client = InitDemo.getClient();) {
            
            // 1、创建search请求
            //SearchRequest searchRequest = new SearchRequest();
            SearchRequest searchRequest = new SearchRequest("bank"); 
            searchRequest.types("_doc");
            
            // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 
            
            //构造QueryBuilder
            /*QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy")
                    .fuzziness(Fuzziness.AUTO)
                    .prefixLength(3)
                    .maxExpansions(10);
            sourceBuilder.query(matchQueryBuilder);*/
            
            sourceBuilder.query(QueryBuilders.termQuery("age", 24)); 
            sourceBuilder.from(0); 
            sourceBuilder.size(10); 
            sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); 
            
            //是否返回_source字段
            //sourceBuilder.fetchSource(false);
            
            //设置返回哪些字段
            /*String[] includeFields = new String[] {"title", "user", "innerObject.*"};
            String[] excludeFields = new String[] {"_type"};
            sourceBuilder.fetchSource(includeFields, excludeFields);*/
            
            //指定排序
            //sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); 
            //sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));
            
            // 设置返回 profile 
            //sourceBuilder.profile(true);
            
            //将请求体加入到请求中
            searchRequest.source(sourceBuilder);
            
            // 可选的设置
            //searchRequest.routing("routing");
            
            // 高亮设置
            /*
            HighlightBuilder highlightBuilder = new HighlightBuilder(); 
            HighlightBuilder.Field highlightTitle =
                    new HighlightBuilder.Field("title"); 
            highlightTitle.highlighterType("unified");  
            highlightBuilder.field(highlightTitle);  
            HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");
            highlightBuilder.field(highlightUser);
            sourceBuilder.highlighter(highlightBuilder);*/
            
            
            //加入聚合
            /*TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company")
                    .field("company.keyword");
            aggregation.subAggregation(AggregationBuilders.avg("average_age")
                    .field("age"));
            sourceBuilder.aggregation(aggregation);*/
            
            //做查询建议
            /*SuggestionBuilder termSuggestionBuilder =
                    SuggestBuilders.termSuggestion("user").text("kmichy"); 
                SuggestBuilder suggestBuilder = new SuggestBuilder();
                suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder); 
            sourceBuilder.suggest(suggestBuilder);*/
            
            //3、发送请求        
            SearchResponse searchResponse = client.search(searchRequest);
            
            
            //4、处理响应
            //搜索结果状态信息
            RestStatus status = searchResponse.status();
            TimeValue took = searchResponse.getTook();
            Boolean terminatedEarly = searchResponse.isTerminatedEarly();
            boolean timedOut = searchResponse.isTimedOut();
            
            //分片搜索情况
            int totalShards = searchResponse.getTotalShards();
            int successfulShards = searchResponse.getSuccessfulShards();
            int failedShards = searchResponse.getFailedShards();
            for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
                // failures should be handled here
            }
            
            //处理搜索命中文档结果
            SearchHits hits = searchResponse.getHits();
            
            long totalHits = hits.getTotalHits();
            float maxScore = hits.getMaxScore();
            
            SearchHit[] searchHits = hits.getHits();
            for (SearchHit hit : searchHits) {
                // do something with the SearchHit
                
                String index = hit.getIndex();
                String type = hit.getType();
                String id = hit.getId();
                float score = hit.getScore();
                
                //取_source字段值
                String sourceAsString = hit.getSourceAsString(); //取成json串
                Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map对象
                //从map中取字段值
                /*
                String documentTitle = (String) sourceAsMap.get("title"); 
                List<Object> users = (List<Object>) sourceAsMap.get("user");
                Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
                */
                logger.info("index:" + index + "  type:" + type + "  id:" + id);
                logger.info(sourceAsString);
                
                //取高亮结果
                /*Map<String, HighlightField> highlightFields = hit.getHighlightFields();
                HighlightField highlight = highlightFields.get("title"); 
                Text[] fragments = highlight.fragments();  
                String fragmentString = fragments[0].string();*/
            }
            
            // 获取聚合结果
            /*
            Aggregations aggregations = searchResponse.getAggregations();
            Terms byCompanyAggregation = aggregations.get("by_company"); 
            Bucket elasticBucket = byCompanyAggregation.getBucketByKey("Elastic"); 
            Avg averageAge = elasticBucket.getAggregations().get("average_age"); 
            double avg = averageAge.getValue();
            */
            
            // 获取建议结果
            /*Suggest suggest = searchResponse.getSuggest(); 
            TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user"); 
            for (TermSuggestion.Entry entry : termSuggestion.getEntries()) { 
                for (TermSuggestion.Entry.Option option : entry) { 
                    String suggestText = option.getText().string();
                }
            }
            */
            
            //异步方式发送获查询请求
            /*
            ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {
                @Override
                public void onResponse(SearchResponse getResponse) {
                    //结果获取
                }
            
                @Override
                public void onFailure(Exception e) {
                    //失败处理
                }
            };
            client.searchAsync(searchRequest, listener); 
            */
            
        } catch (IOException e) {
            logger.error(e);
        }
    }
}
复制代码

 6. highlight 高亮

HighlightDemo.java

复制代码
package com.study.es_hrset_client;

import java.io.IOException;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;

/**
 * 
 * @Description: 高亮
 * @author lgs
 * @date 2018年6月23日
 *
 */
public class HighlightDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        try (RestHighLevelClient client = InitDemo.getClient();) {
            
            // 1、创建search请求
            SearchRequest searchRequest = new SearchRequest("hl_test"); 
            
            // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 
            
            //构造QueryBuilder
            QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("title", "lucene solr");
            sourceBuilder.query(matchQueryBuilder);
            
            //分页设置
            /*sourceBuilder.from(0); 
            sourceBuilder.size(5); ;*/ 
            
                    
            // 高亮设置
            HighlightBuilder highlightBuilder = new HighlightBuilder(); 
            highlightBuilder.requireFieldMatch(false).field("title").field("content")
                .preTags("<strong>").postTags("</strong>");
            //不同字段可有不同设置,如不同标签
            /*HighlightBuilder.Field highlightTitle = new HighlightBuilder.Field("title"); 
            highlightTitle.preTags("<strong>").postTags("</strong>");
            highlightBuilder.field(highlightTitle);  
            HighlightBuilder.Field highlightContent = new HighlightBuilder.Field("content");
            highlightContent.preTags("<b>").postTags("</b>");
            highlightBuilder.field(highlightContent).requireFieldMatch(false);*/
            
            sourceBuilder.highlighter(highlightBuilder);
            
            searchRequest.source(sourceBuilder);
            
            //3、发送请求        
            SearchResponse searchResponse = client.search(searchRequest);
            
            
            //4、处理响应
            if(RestStatus.OK.equals(searchResponse.status())) {
                //处理搜索命中文档结果
                SearchHits hits = searchResponse.getHits();
                long totalHits = hits.getTotalHits();
                
                SearchHit[] searchHits = hits.getHits();
                for (SearchHit hit : searchHits) {        
                    String index = hit.getIndex();
                    String type = hit.getType();
                    String id = hit.getId();
                    float score = hit.getScore();
                    
                    //取_source字段值
                    //String sourceAsString = hit.getSourceAsString(); //取成json串
                    Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map对象
                    //从map中取字段值
                    /*String title = (String) sourceAsMap.get("title"); 
                    String content  = (String) sourceAsMap.get("content"); */
                    logger.info("index:" + index + "  type:" + type + "  id:" + id);
                    logger.info("sourceMap : " +  sourceAsMap);
                    //取高亮结果
                    Map<String, HighlightField> highlightFields = hit.getHighlightFields();
                    HighlightField highlight = highlightFields.get("title"); 
                    if(highlight != null) {
                        Text[] fragments = highlight.fragments();  //多值的字段会有多个值
                        if(fragments != null) {
                            String fragmentString = fragments[0].string();
                            logger.info("title highlight : " +  fragmentString);
                            //可用高亮字符串替换上面sourceAsMap中的对应字段返回到上一级调用
                            //sourceAsMap.put("title", fragmentString);
                        }
                    }
                    
                    highlight = highlightFields.get("content"); 
                    if(highlight != null) {
                        Text[] fragments = highlight.fragments();  //多值的字段会有多个值
                        if(fragments != null) {
                            String fragmentString = fragments[0].string();
                            logger.info("content highlight : " +  fragmentString);
                            //可用高亮字符串替换上面sourceAsMap中的对应字段返回到上一级调用
                            //sourceAsMap.put("content", fragmentString);
                        }
                    }
                }
            }
            
        } catch (IOException e) {
            logger.error(e);
        }
    }
}
复制代码

 7. suggest 查询建议

SuggestDemo.java

复制代码
package com.study.es_hrset_client;

import java.io.IOException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.SuggestionBuilder;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.elasticsearch.search.suggest.term.TermSuggestion;

/**
 * 
 * @Description: 查询建议
 * @author lgs
 * @date 2018年6月23日
 *
 */
public class SuggestDemo {
    
    private static Logger logger = LogManager.getRootLogger();  
    
    //词项建议拼写检查,检查用户的拼写是否错误,如果有错给用户推荐正确的词,appel->apple
    public static void termSuggest() {
        try (RestHighLevelClient client = InitDemo.getClient();) {
            
            // 1、创建search请求
            //SearchRequest searchRequest = new SearchRequest();
            SearchRequest searchRequest = new SearchRequest("mess"); 
            
            // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 
             
            sourceBuilder.size(0); 
            
            //做查询建议        
            //词项建议
            SuggestionBuilder termSuggestionBuilder =
                    SuggestBuilders.termSuggestion("user").text("kmichy"); 
            SuggestBuilder suggestBuilder = new SuggestBuilder();
            suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);     
            sourceBuilder.suggest(suggestBuilder);
            
            searchRequest.source(sourceBuilder);    
            
            //3、发送请求        
            SearchResponse searchResponse = client.search(searchRequest);
            
            
            //4、处理响应
            //搜索结果状态信息
            if(RestStatus.OK.equals(searchResponse.status())) {
                // 获取建议结果
                Suggest suggest = searchResponse.getSuggest(); 
                TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user"); 
                for (TermSuggestion.Entry entry : termSuggestion.getEntries()) { 
                    logger.info("text: " + entry.getText().string());
                    for (TermSuggestion.Entry.Option option : entry) { 
                        String suggestText = option.getText().string();
                        logger.info("   suggest option : " + suggestText);
                    }
                }
            }
            /*
              "suggest": {
                "my-suggestion": [
                  {
                    "text": "tring",
                    "offset": 0,
                    "length": 5,
                    "options": [
                      {
                        "text": "trying",
                        "score": 0.8,
                        "freq": 1
                      }
                    ]
                  },
                  {
                    "text": "out",
                    "offset": 6,
                    "length": 3,
                    "options": []
                  },
                  {
                    "text": "elasticsearch",
                    "offset": 10,
                    "length": 13,
                    "options": []
                  }
                ]
              }*/

        } catch (IOException e) {
            logger.error(e);
        }
    }
    
    //自动补全,根据用户的输入联想到可能的词或者短语
    public static void completionSuggester() {
        try (RestHighLevelClient client = InitDemo.getClient();) {
            
            // 1、创建search请求
            //SearchRequest searchRequest = new SearchRequest();
            SearchRequest searchRequest = new SearchRequest("music"); 
            
            // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 
             
            sourceBuilder.size(0); 
            
            //做查询建议        
            //自动补全
            /*POST music/_search?pretty
                    {
                        "suggest": {
                            "song-suggest" : {
                                "prefix" : "lucene s", 
                                "completion" : { 
                                    "field" : "suggest" ,
                                    "skip_duplicates": true
                                }
                            }
                        }
                    }*/

            SuggestionBuilder termSuggestionBuilder =
                    SuggestBuilders.completionSuggestion("suggest").prefix("lucene s")
                    .skipDuplicates(true); 
            SuggestBuilder suggestBuilder = new SuggestBuilder();
            suggestBuilder.addSuggestion("song-suggest", termSuggestionBuilder);     
            sourceBuilder.suggest(suggestBuilder);
            
            searchRequest.source(sourceBuilder);    
            
            //3、发送请求        
            SearchResponse searchResponse = client.search(searchRequest);
            
            
            //4、处理响应
            //搜索结果状态信息
            if(RestStatus.OK.equals(searchResponse.status())) {
                // 获取建议结果
                Suggest suggest = searchResponse.getSuggest(); 
                CompletionSuggestion termSuggestion = suggest.getSuggestion("song-suggest"); 
                for (CompletionSuggestion.Entry entry : termSuggestion.getEntries()) { 
                    logger.info("text: " + entry.getText().string());
                    for (CompletionSuggestion.Entry.Option option : entry) { 
                        String suggestText = option.getText().string();
                        logger.info("   suggest option : " + suggestText);
                    }
                }
            }

        } catch (IOException e) {
            logger.error(e);
        }
    }

    public static void main(String[] args) {
        termSuggest();
        
        logger.info("--------------------------------------");
        
        completionSuggester();
    }
}
复制代码

 8. aggregation 聚合分析

 AggregationDemo.java

复制代码
package com.study.es_hrset_client;

import java.io.IOException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
import org.elasticsearch.search.builder.SearchSourceBuilder;

/**
 * 
 * @Description: 聚合分析
 * @author lgs
 * @date 2018年6月23日
 *
 */
public class AggregationDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        try (RestHighLevelClient client = InitDemo.getClient();) {
            
            // 1、创建search请求
            //SearchRequest searchRequest = new SearchRequest();
            SearchRequest searchRequest = new SearchRequest("bank"); 
            
            // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 
             
            sourceBuilder.size(0); 

            //加入聚合
            //字段值项分组聚合
            TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_age")
                    .field("age").order(BucketOrder.aggregation("average_balance", true));
            //计算每组的平均balance指标
            aggregation.subAggregation(AggregationBuilders.avg("average_balance")
                    .field("balance"));
            sourceBuilder.aggregation(aggregation);
            
            searchRequest.source(sourceBuilder);
            
            //3、发送请求        
            SearchResponse searchResponse = client.search(searchRequest);
                
            //4、处理响应
            //搜索结果状态信息
            if(RestStatus.OK.equals(searchResponse.status())) {
                // 获取聚合结果
                Aggregations aggregations = searchResponse.getAggregations();
                Terms byAgeAggregation = aggregations.get("by_age"); 
                logger.info("aggregation by_age 结果");
                logger.info("docCountError: " + byAgeAggregation.getDocCountError());
                logger.info("sumOfOtherDocCounts: " + byAgeAggregation.getSumOfOtherDocCounts());
                logger.info("------------------------------------");
                for(Bucket buck : byAgeAggregation.getBuckets()) {
                    logger.info("key: " + buck.getKeyAsNumber());
                    logger.info("docCount: " + buck.getDocCount());
                    logger.info("docCountError: " + buck.getDocCountError());
                    //取子聚合
                    Avg averageBalance = buck.getAggregations().get("average_balance"); 

                    logger.info("average_balance: " + averageBalance.getValue());
                    logger.info("------------------------------------");
                }
                //直接用key 来去分组
                /*Bucket elasticBucket = byCompanyAggregation.getBucketByKey("24"); 
                Avg averageAge = elasticBucket.getAggregations().get("average_age"); 
                double avg = averageAge.getValue();*/
                
            }
            
        } catch (IOException e) {
            logger.error(e);
        }
    }
}
复制代码

 9. 官网资料

各种查询对应的QueryBuilder:

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-query-builders.html

各种聚合对应的AggregationBuilder:

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-aggregation-builders.html

四、Java Client

1. Java Client 说明

java client 使用 TransportClient,各种操作本质上都是异步的(可以用 listener,或返回 Future )。 
注意:ES的发展规划中在7.0版本开始将废弃 TransportClient,8.0版本中将完全移除 TransportClient,取而代之的是High Level REST Client。
High Level REST Client 中的操作API和java client 大多是一样的。

2. 官方学习链接

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html

3. 兼容性说明

请使用与服务端ES版本一致的客户端版本

4. Java Client maven 集成

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>6.2.4</version>
</dependency>

5. Java Client logger 日志器说明

使用的是 log4j2 日志组件。
如果要使用其他的日志组件,可使用slf4j作桥

6. Init Client

Init Client setting 可用参数说明:

cluster.name
指定集群的名字,如果集群的名字不是默认的elasticsearch,需指定。
client.transport.sniff
设置为true,将自动嗅探整个集群,自动加入集群的节点到连接列表中。
client.transport.ignore_cluster_name
Set to true to ignore cluster name validation of connected nodes. (since 0.19.4)
client.transport.ping_timeout
The time to wait for a ping response from a node. Defaults to 5s.
client.transport.nodes_sampler_interval
How often to sample / ping the nodes listed and connected. Defaults to 5s.

五、Java Client使用示例

注意:TransPort客户端的使用和RESTful风格的使用基本一致,除了获取客户端不一样,还有发送请求有的不一样外

准备:

编写示例之前首先在maven工程里面引入和ES服务端版本一样的Java客户端

<dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>transport</artifactId>
        <version>6.2.4</version>
    </dependency>

给定集群的多个节点地址,将客户端负载均衡地向这个节点地址集发请求:

InitDemo.java

复制代码
package com.study.es_java_client;

import java.net.InetAddress;
import java.net.UnknownHostException;

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

public class InitDemo {
    
    private static TransportClient client;

    public static TransportClient getClient() throws UnknownHostException {

        if(client == null) {
            //client = new PreBuiltTransportClient(Settings.EMPTY)
            
            // 连接集群的设置
            Settings settings = Settings.builder()
                    //.put("cluster.name", "myClusterName") //如果集群的名字不是默认的elasticsearch,需指定
                    .put("client.transport.sniff", true) //自动嗅探
                    .build();   
            client = new PreBuiltTransportClient(settings)
                //.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
            
            //可用连接设置参数说明
            /*
            cluster.name
                指定集群的名字,如果集群的名字不是默认的elasticsearch,需指定。
            client.transport.sniff
                设置为true,将自动嗅探整个集群,自动加入集群的节点到连接列表中。    
            client.transport.ignore_cluster_name
                Set to true to ignore cluster name validation of connected nodes. (since 0.19.4)    
            client.transport.ping_timeout
                The time to wait for a ping response from a node. Defaults to 5s.
            client.transport.nodes_sampler_interval
                How often to sample / ping the nodes listed and connected. Defaults to 5s.
            */
            
        }
        return client;
    }
}
复制代码

注意使用ES的客户端时类比之前我们在Kibana进行的ES的相关操作,这样使用起来更加有效果

1. Create index 创建索引

CreateIndexDemo.java

复制代码
package com.study.es_java_client;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;

public class CreateIndexDemo {

    public static void main(String[] args) {
        //这里和RESTful风格不同
        try (TransportClient client = InitDemo.getClient();) {

            // 1、创建 创建索引request
            CreateIndexRequest request = new CreateIndexRequest("mess");

            // 2、设置索引的settings
            request.settings(Settings.builder().put("index.number_of_shards", 3) // 分片数
                    .put("index.number_of_replicas", 2) // 副本数
                    .put("analysis.analyzer.default.tokenizer", "ik_smart") // 默认分词器
            );

            // 3、设置索引的mappings
            request.mapping("_doc",
                    "  {\n" +
                    "    \"_doc\": {\n" +
                    "      \"properties\": {\n" +
                    "        \"message\": {\n" +
                    "          \"type\": \"text\"\n" +
                    "        }\n" +
                    "      }\n" +
                    "    }\n" +
                    "  }",
                    XContentType.JSON);

            // 4、 设置索引的别名
            request.alias(new Alias("mmm"));

            // 5、 发送请求 这里和RESTful风格不同
            CreateIndexResponse createIndexResponse = client.admin().indices()
                    .create(request).get();

            // 6、处理响应
            boolean acknowledged = createIndexResponse.isAcknowledged();
            boolean shardsAcknowledged = createIndexResponse
                    .isShardsAcknowledged();
            System.out.println("acknowledged = " + acknowledged);
            System.out.println("shardsAcknowledged = " + shardsAcknowledged);

            // listener方式发送请求
            /*ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {
                @Override
                public void onResponse(
                        CreateIndexResponse createIndexResponse) {
                    // 6、处理响应
                    boolean acknowledged = createIndexResponse.isAcknowledged();
                    boolean shardsAcknowledged = createIndexResponse
                            .isShardsAcknowledged();
                    System.out.println("acknowledged = " + acknowledged);
                    System.out.println(
                            "shardsAcknowledged = " + shardsAcknowledged);
                }

                @Override
                public void onFailure(Exception e) {
                    System.out.println("创建索引异常:" + e.getMessage());
                }
            };
            client.admin().indices().create(request, listener);
            */
        } catch (IOException | InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}
复制代码

2. index document

索引文档,即往索引里面放入文档数据.类似于数据库里面向表里面插入一行数据,一行数据就是一个文档
IndexDocumentDemo.java

复制代码
package com.study.es_java_client;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;

public class IndexDocumentDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        //这里和RESTful风格不同
        try (TransportClient client = InitDemo.getClient();) {
            // 1、创建索引请求
            IndexRequest request = new IndexRequest(
                    "mess",   //索引
                    "_doc",     // mapping type
                    "11");     //文档id  
            
            // 2、准备文档数据
            // 方式一:直接给JSON串
            String jsonString = "{" +
                    "\"user\":\"kimchy\"," +
                    "\"postDate\":\"2013-01-30\"," +
                    "\"message\":\"trying out Elasticsearch\"" +
                    "}";
            request.source(jsonString, XContentType.JSON); 
            
            // 方式二:以map对象来表示文档
            /*
            Map<String, Object> jsonMap = new HashMap<>();
            jsonMap.put("user", "kimchy");
            jsonMap.put("postDate", new Date());
            jsonMap.put("message", "trying out Elasticsearch");
            request.source(jsonMap); 
            */
            
            // 方式三:用XContentBuilder来构建文档
            /*
            XContentBuilder builder = XContentFactory.jsonBuilder();
            builder.startObject();
            {
                builder.field("user", "kimchy");
                builder.field("postDate", new Date());
                builder.field("message", "trying out Elasticsearch");
            }
            builder.endObject();
            request.source(builder); 
            */
            
            // 方式四:直接用key-value对给出
            /*
            request.source("user", "kimchy",
                            "postDate", new Date(),
                            "message", "trying out Elasticsearch");
            */
            
            //3、其他的一些可选设置
            /*
            request.routing("routing");  //设置routing值
            request.timeout(TimeValue.timeValueSeconds(1));  //设置主分片等待时长
            request.setRefreshPolicy("wait_for");  //设置重刷新策略
            request.version(2);  //设置版本号
            request.opType(DocWriteRequest.OpType.CREATE);  //操作类别  
            */
            
            //4、发送请求
            IndexResponse indexResponse = null;
            try {
                //方式一: 用client.index 方法,返回是 ActionFuture<IndexResponse>,再调用get获取响应结果
                indexResponse = client.index(request).get();
                
                //方式二:client提供了一个 prepareIndex方法,内部为我们创建IndexRequest
                /*IndexResponse indexResponse = client.prepareIndex("mess","_doc","11")
                        .setSource(jsonString, XContentType.JSON)
                        .get();*/
                
                //方式三:request + listener
                //client.index(request, listener);    
                
            } catch(ElasticsearchException e) {
                // 捕获,并处理异常
                //判断是否版本冲突、create但文档已存在冲突
                if (e.status() == RestStatus.CONFLICT) {
                    logger.error("冲突了,请在此写冲突处理逻辑!\n" + e.getDetailedMessage());
                }
                
                logger.error("索引异常", e);
            }catch (InterruptedException | ExecutionException e) {
                logger.error("索引异常", e);
            }
            
            
            
            
            //5、处理响应
            if(indexResponse != null) {
                String index = indexResponse.getIndex();
                String type = indexResponse.getType();
                String id = indexResponse.getId();
                long version = indexResponse.getVersion();
                if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                    System.out.println("新增文档成功,处理逻辑代码写到这里。");
                } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                    System.out.println("修改文档成功,处理逻辑代码写到这里。");
                }
                // 分片处理信息
                ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
                if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
                    
                }
                // 如果有分片副本失败,可以获得失败原因信息
                if (shardInfo.getFailed() > 0) {
                    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                        String reason = failure.reason(); 
                        System.out.println("副本失败原因:" + reason);
                    }
                }
            }
            
            
            //listener 方式
            /*ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
                @Override
                public void onResponse(IndexResponse indexResponse) {
                    
                }

                @Override
                public void onFailure(Exception e) {
                    
                }
            };
            client.index(request, listener);
            */
            
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
复制代码

3. get document 

获取文档数据
GetDocumentDemo.java

复制代码
package com.study.es_java_client;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

public class GetDocumentDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        //这里和RESTful风格不同
        try (TransportClient client = InitDemo.getClient();) {
            // 1、创建获取文档请求
            GetRequest request = new GetRequest(
                    "mess",   //索引
                    "_doc",     // mapping type
                    "11");     //文档id  
            
            // 2、可选的设置
            //request.routing("routing");
            //request.version(2);
            
            //request.fetchSourceContext(new FetchSourceContext(false)); //是否获取_source字段
            //选择返回的字段
            String[] includes = new String[]{"message", "*Date"};
            String[] excludes = Strings.EMPTY_ARRAY;
            FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
            request.fetchSourceContext(fetchSourceContext); 
            
            //也可写成这样
            /*String[] includes = Strings.EMPTY_ARRAY;
            String[] excludes = new String[]{"message"};
            FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
            request.fetchSourceContext(fetchSourceContext);*/
            
            
            // 取stored字段
            /*request.storedFields("message"); 
            GetResponse getResponse = client.get(request);
            String message = getResponse.getField("message").getValue();*/
            
            
            //3、发送请求        
            GetResponse getResponse = null;
            try {
                getResponse = client.get(request).get();
            } catch (ElasticsearchException e) {
                if (e.status() == RestStatus.NOT_FOUND) {
                    logger.error("没有找到该id的文档" );
                }
                if (e.status() == RestStatus.CONFLICT) {
                    logger.error("获取时版本冲突了,请在此写冲突处理逻辑!" );
                }
                logger.error("获取文档异常", e);
            }catch (InterruptedException | ExecutionException e) {
                logger.error("索引异常", e);
            }
            
            //4、处理响应
            if(getResponse != null) {
                String index = getResponse.getIndex();
                String type = getResponse.getType();
                String id = getResponse.getId();
                if (getResponse.isExists()) { // 文档存在
                    long version = getResponse.getVersion();
                    String sourceAsString = getResponse.getSourceAsString(); //结果取成 String       
                    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();  // 结果取成Map
                    byte[] sourceAsBytes = getResponse.getSourceAsBytes();    //结果取成字节数组
                    
                    logger.info("index:" + index + "  type:" + type + "  id:" + id);
                    logger.info(sourceAsString);
                    
                } else {
                    logger.error("没有找到该id的文档" );
                }
            }
            
            
            //异步方式发送获取文档请求
            /*
            ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
                @Override
                public void onResponse(GetResponse getResponse) {
                    
                }
            
                @Override
                public void onFailure(Exception e) {
                    
                }
            };
            client.getAsync(request, listener);
            */
            
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
复制代码

4. Bulk

批量索引文档,即批量往索引里面放入文档数据.类似于数据库里面批量向表里面插入多行数据,一行数据就是一个文档
BulkDemo.java

复制代码
package com.study.es_java_client;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentType;

public class BulkDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        //这里和RESTful风格不同
        try (TransportClient client = InitDemo.getClient();) {
            
            // 1、创建批量操作请求
            BulkRequest request = new BulkRequest(); 
            request.add(new IndexRequest("mess", "_doc", "1")  
                    .source(XContentType.JSON,"field", "foo"));
            request.add(new IndexRequest("mess", "_doc", "2")  
                    .source(XContentType.JSON,"field", "bar"));
            request.add(new IndexRequest("mess", "_doc", "3")  
                    .source(XContentType.JSON,"field", "baz"));
            
            /*
            request.add(new DeleteRequest("mess", "_doc", "3")); 
            request.add(new UpdateRequest("mess", "_doc", "2") 
                    .doc(XContentType.JSON,"other", "test"));
            request.add(new IndexRequest("mess", "_doc", "4")  
                    .source(XContentType.JSON,"field", "baz"));
            */
            
            // 2、可选的设置
            /*
            request.timeout("2m");
            request.setRefreshPolicy("wait_for");  
            request.waitForActiveShards(2);
            */
            
            
            //3、发送请求        
        
            // 同步请求
            BulkResponse bulkResponse = client.bulk(request).get();
            
            
            //4、处理响应
            if(bulkResponse != null) {
                for (BulkItemResponse bulkItemResponse : bulkResponse) { 
                    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

                    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
                            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { 
                        IndexResponse indexResponse = (IndexResponse) itemResponse;
                        //TODO 新增成功的处理

                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { 
                        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                       //TODO 修改成功的处理

                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
                        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                        //TODO 删除成功的处理
                    }
                }
            }
            
            
            //异步方式发送批量操作请求
            /*
            ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse bulkResponse) {
                    
                }
            
                @Override
                public void onFailure(Exception e) {
                    
                }
            };
            client.bulkAsync(request, listener);
            */
            
        } catch (IOException | InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}
复制代码

5. search

搜索数据
SearchDemo.java

复制代码
package com.study.es_java_client;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;

public class SearchDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        try (TransportClient client = InitDemo.getClient();) {
            
            // 1、创建search请求
            //SearchRequest searchRequest = new SearchRequest();
            SearchRequest searchRequest = new SearchRequest("bank"); 
            searchRequest.types("_doc");
            
            // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 
            
            //构造QueryBuilder
            /*QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy")
                    .fuzziness(Fuzziness.AUTO)
                    .prefixLength(3)
                    .maxExpansions(10);
            sourceBuilder.query(matchQueryBuilder);*/
            
            sourceBuilder.query(QueryBuilders.termQuery("age", 24)); 
            sourceBuilder.from(0); 
            sourceBuilder.size(10); 
            sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); 
            
            //是否返回_source字段
            //sourceBuilder.fetchSource(false);
            
            //设置返回哪些字段
            /*String[] includeFields = new String[] {"title", "user", "innerObject.*"};
            String[] excludeFields = new String[] {"_type"};
            sourceBuilder.fetchSource(includeFields, excludeFields);*/
            
            //指定排序
            //sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); 
            //sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));
            
            // 设置返回 profile 
            //sourceBuilder.profile(true);
            
            //将请求体加入到请求中
            searchRequest.source(sourceBuilder);
            
            // 可选的设置
            //searchRequest.routing("routing");
            
            // 高亮设置
            /*
            HighlightBuilder highlightBuilder = new HighlightBuilder(); 
            HighlightBuilder.Field highlightTitle =
                    new HighlightBuilder.Field("title"); 
            highlightTitle.highlighterType("unified");  
            highlightBuilder.field(highlightTitle);  
            HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");
            highlightBuilder.field(highlightUser);
            sourceBuilder.highlighter(highlightBuilder);*/
            
            
            //加入聚合
            /*TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company")
                    .field("company.keyword");
            aggregation.subAggregation(AggregationBuilders.avg("average_age")
                    .field("age"));
            sourceBuilder.aggregation(aggregation);*/
            
            //做查询建议
            /*SuggestionBuilder termSuggestionBuilder =
                    SuggestBuilders.termSuggestion("user").text("kmichy"); 
                SuggestBuilder suggestBuilder = new SuggestBuilder();
                suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder); 
            sourceBuilder.suggest(suggestBuilder);*/
            
            //3、发送请求        
            SearchResponse searchResponse = client.search(searchRequest).get();
            
            
            //4、处理响应
            //搜索结果状态信息
            RestStatus status = searchResponse.status();
            TimeValue took = searchResponse.getTook();
            Boolean terminatedEarly = searchResponse.isTerminatedEarly();
            boolean timedOut = searchResponse.isTimedOut();
            
            //分片搜索情况
            int totalShards = searchResponse.getTotalShards();
            int successfulShards = searchResponse.getSuccessfulShards();
            int failedShards = searchResponse.getFailedShards();
            for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
                // failures should be handled here
            }
            
            //处理搜索命中文档结果
            SearchHits hits = searchResponse.getHits();
            
            long totalHits = hits.getTotalHits();
            float maxScore = hits.getMaxScore();
            
            SearchHit[] searchHits = hits.getHits();
            for (SearchHit hit : searchHits) {
                // do something with the SearchHit
                
                String index = hit.getIndex();
                String type = hit.getType();
                String id = hit.getId();
                float score = hit.getScore();
                
                //取_source字段值
                String sourceAsString = hit.getSourceAsString(); //取成json串
                Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map对象
                //从map中取字段值
                /*
                String documentTitle = (String) sourceAsMap.get("title"); 
                List<Object> users = (List<Object>) sourceAsMap.get("user");
                Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
                */
                logger.info("index:" + index + "  type:" + type + "  id:" + id);
                logger.info(sourceAsString);
                
                //取高亮结果
                /*Map<String, HighlightField> highlightFields = hit.getHighlightFields();
                HighlightField highlight = highlightFields.get("title"); 
                Text[] fragments = highlight.fragments();  
                String fragmentString = fragments[0].string();*/
            }
            
            // 获取聚合结果
            /*
            Aggregations aggregations = searchResponse.getAggregations();
            Terms byCompanyAggregation = aggregations.get("by_company"); 
            Bucket elasticBucket = byCompanyAggregation.getBucketByKey("Elastic"); 
            Avg averageAge = elasticBucket.getAggregations().get("average_age"); 
            double avg = averageAge.getValue();
            */
            
            // 获取建议结果
            /*Suggest suggest = searchResponse.getSuggest(); 
            TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user"); 
            for (TermSuggestion.Entry entry : termSuggestion.getEntries()) { 
                for (TermSuggestion.Entry.Option option : entry) { 
                    String suggestText = option.getText().string();
                }
            }
            */
            
            //异步方式发送获查询请求
            /*
            ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {
                @Override
                public void onResponse(SearchResponse getResponse) {
                    //结果获取
                }
            
                @Override
                public void onFailure(Exception e) {
                    //失败处理
                }
            };
            client.searchAsync(searchRequest, listener); 
            */
            
        } catch (IOException | InterruptedException | ExecutionException e) {
            logger.error(e);
        }
    }
}
复制代码

6. highlight 高亮

HighlightDemo.java

复制代码
package com.study.es_java_client;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;

public class HighlightDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        try (TransportClient client = InitDemo.getClient();) {
            
            // 1、创建search请求
            SearchRequest searchRequest = new SearchRequest("hl_test"); 
            
            // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 
            
            //构造QueryBuilder
            QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("title", "lucene solr");
            sourceBuilder.query(matchQueryBuilder);
            
            //分页设置
            /*sourceBuilder.from(0); 
            sourceBuilder.size(5); ;*/ 
            
                    
            // 高亮设置
            HighlightBuilder highlightBuilder = new HighlightBuilder(); 
            highlightBuilder.requireFieldMatch(false).field("title").field("content")
                .preTags("<strong>").postTags("</strong>");
            //不同字段可有不同设置,如不同标签
            /*HighlightBuilder.Field highlightTitle = new HighlightBuilder.Field("title"); 
            highlightTitle.preTags("<strong>").postTags("</strong>");
            highlightBuilder.field(highlightTitle);  
            HighlightBuilder.Field highlightContent = new HighlightBuilder.Field("content");
            highlightContent.preTags("<b>").postTags("</b>");
            highlightBuilder.field(highlightContent).requireFieldMatch(false);*/
            
            sourceBuilder.highlighter(highlightBuilder);
            
            searchRequest.source(sourceBuilder);
            
            //3、发送请求        
            SearchResponse searchResponse = client.search(searchRequest).get();
            
            
            //4、处理响应
            if(RestStatus.OK.equals(searchResponse.status())) {
                //处理搜索命中文档结果
                SearchHits hits = searchResponse.getHits();
                long totalHits = hits.getTotalHits();
                
                SearchHit[] searchHits = hits.getHits();
                for (SearchHit hit : searchHits) {        
                    String index = hit.getIndex();
                    String type = hit.getType();
                    String id = hit.getId();
                    float score = hit.getScore();
                    
                    //取_source字段值
                    //String sourceAsString = hit.getSourceAsString(); //取成json串
                    Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map对象
                    //从map中取字段值
                    /*String title = (String) sourceAsMap.get("title"); 
                    String content  = (String) sourceAsMap.get("content"); */
                    logger.info("index:" + index + "  type:" + type + "  id:" + id);
                    logger.info("sourceMap : " +  sourceAsMap);
                    //取高亮结果
                    Map<String, HighlightField> highlightFields = hit.getHighlightFields();
                    HighlightField highlight = highlightFields.get("title"); 
                    if(highlight != null) {
                        Text[] fragments = highlight.fragments();  //多值的字段会有多个值
                        if(fragments != null) {
                            String fragmentString = fragments[0].string();
                            logger.info("title highlight : " +  fragmentString);
                            //可用高亮字符串替换上面sourceAsMap中的对应字段返回到上一级调用
                            //sourceAsMap.put("title", fragmentString);
                        }
                    }
                    
                    highlight = highlightFields.get("content"); 
                    if(highlight != null) {
                        Text[] fragments = highlight.fragments();  //多值的字段会有多个值
                        if(fragments != null) {
                            String fragmentString = fragments[0].string();
                            logger.info("content highlight : " +  fragmentString);
                            //可用高亮字符串替换上面sourceAsMap中的对应字段返回到上一级调用
                            //sourceAsMap.put("content", fragmentString);
                        }
                    }
                }
            }
            
        } catch (IOException | InterruptedException | ExecutionException e) {
            logger.error(e);
        }
    }
}
复制代码

7. suggest 查询建议

SuggestDemo.java

复制代码
package com.study.es_java_client;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.SuggestionBuilder;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.elasticsearch.search.suggest.term.TermSuggestion;

public class SuggestDemo {
    
    private static Logger logger = LogManager.getRootLogger();  
    
    //拼写检查
    public static void termSuggest(TransportClient client) {
            
        // 1、创建search请求
        //SearchRequest searchRequest = new SearchRequest();
        SearchRequest searchRequest = new SearchRequest("mess"); 
        
        // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 
         
        sourceBuilder.size(0); 
        
        //做查询建议        
        //词项建议
        SuggestionBuilder termSuggestionBuilder =
                SuggestBuilders.termSuggestion("user").text("kmichy"); 
        SuggestBuilder suggestBuilder = new SuggestBuilder();
        suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);     
        sourceBuilder.suggest(suggestBuilder);
        
        searchRequest.source(sourceBuilder);    

        try{
            //3、发送请求        
            SearchResponse searchResponse = client.search(searchRequest).get();
            
            
            //4、处理响应
            //搜索结果状态信息
            if(RestStatus.OK.equals(searchResponse.status())) {
                // 获取建议结果
                Suggest suggest = searchResponse.getSuggest(); 
                TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user"); 
                for (TermSuggestion.Entry entry : termSuggestion.getEntries()) { 
                    logger.info("text: " + entry.getText().string());
                    for (TermSuggestion.Entry.Option option : entry) { 
                        String suggestText = option.getText().string();
                        logger.info("   suggest option : " + suggestText);
                    }
                }
            }

        } catch (InterruptedException | ExecutionException e) {
            logger.error(e);
        }
            /*
              "suggest": {
                "my-suggestion": [
                  {
                    "text": "tring",
                    "offset": 0,
                    "length": 5,
                    "options": [
                      {
                        "text": "trying",
                        "score": 0.8,
                        "freq": 1
                      }
                    ]
                  },
                  {
                    "text": "out",
                    "offset": 6,
                    "length": 3,
                    "options": []
                  },
                  {
                    "text": "elasticsearch",
                    "offset": 10,
                    "length": 13,
                    "options": []
                  }
                ]
              }*/

    }
    //自动补全
    public static void completionSuggester(TransportClient client) {
                
        // 1、创建search请求
        //SearchRequest searchRequest = new SearchRequest();
        SearchRequest searchRequest = new SearchRequest("music"); 
        
        // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 
         
        sourceBuilder.size(0); 
        
        //做查询建议        
        //自动补全
        /*POST music/_search?pretty
                {
                    "suggest": {
                        "song-suggest" : {
                            "prefix" : "lucene s", 
                            "completion" : { 
                                "field" : "suggest" ,
                                "skip_duplicates": true
                            }
                        }
                    }
                }*/

        SuggestionBuilder termSuggestionBuilder =
                SuggestBuilders.completionSuggestion("suggest").prefix("lucene s")
                .skipDuplicates(true); 
        SuggestBuilder suggestBuilder = new SuggestBuilder();
        suggestBuilder.addSuggestion("song-suggest", termSuggestionBuilder);     
        sourceBuilder.suggest(suggestBuilder);
        
        searchRequest.source(sourceBuilder);    
            
        try {
            //3、发送请求        
            SearchResponse searchResponse = client.search(searchRequest).get();
            
            
            //4、处理响应
            //搜索结果状态信息
            if(RestStatus.OK.equals(searchResponse.status())) {
                // 获取建议结果
                Suggest suggest = searchResponse.getSuggest(); 
                CompletionSuggestion termSuggestion = suggest.getSuggestion("song-suggest"); 
                for (CompletionSuggestion.Entry entry : termSuggestion.getEntries()) { 
                    logger.info("text: " + entry.getText().string());
                    for (CompletionSuggestion.Entry.Option option : entry) { 
                        String suggestText = option.getText().string();
                        logger.info("   suggest option : " + suggestText);
                    }
                }
            }

        } catch (InterruptedException | ExecutionException e) {
            logger.error(e);
        }
    }

    public static void main(String[] args) {
        try (TransportClient client = InitDemo.getClient();) {
            termSuggest(client);
            
            logger.info("--------------------------------------");
            
            completionSuggester(client);
        } catch (IOException e) {
            logger.error(e);
        }
    }
}
复制代码

8. aggregation 聚合分析

AggregationDemo.java

复制代码
package com.study.es_java_client;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
import org.elasticsearch.search.builder.SearchSourceBuilder;

public class AggregationDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        try (TransportClient client = InitDemo.getClient();) {
            
            // 1、创建search请求
            //SearchRequest searchRequest = new SearchRequest();
            SearchRequest searchRequest = new SearchRequest("bank"); 
            
            // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 
             
            sourceBuilder.size(0); 

            //加入聚合
            //字段值项分组聚合
            TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_age")
                    .field("age").order(BucketOrder.aggregation("average_balance", true));
            //计算每组的平均balance指标
            aggregation.subAggregation(AggregationBuilders.avg("average_balance")
                    .field("balance"));
            sourceBuilder.aggregation(aggregation);
            
            searchRequest.source(sourceBuilder);
            
            //3、发送请求        
            SearchResponse searchResponse = client.search(searchRequest).get();
                
            //4、处理响应
            //搜索结果状态信息
            if(RestStatus.OK.equals(searchResponse.status())) {
                // 获取聚合结果
                Aggregations aggregations = searchResponse.getAggregations();
                Terms byAgeAggregation = aggregations.get("by_age"); 
                logger.info("aggregation by_age 结果");
                logger.info("docCountError: " + byAgeAggregation.getDocCountError());
                logger.info("sumOfOtherDocCounts: " + byAgeAggregation.getSumOfOtherDocCounts());
                logger.info("------------------------------------");
                for(Bucket buck : byAgeAggregation.getBuckets()) {
                    logger.info("key: " + buck.getKeyAsNumber());
                    logger.info("docCount: " + buck.getDocCount());
                    //logger.info("docCountError: " + buck.getDocCountError());
                    //取子聚合
                    Avg averageBalance = buck.getAggregations().get("average_balance"); 

                    logger.info("average_balance: " + averageBalance.getValue());
                    logger.info("------------------------------------");
                }
                //直接用key 来去分组
                /*Bucket elasticBucket = byCompanyAggregation.getBucketByKey("24"); 
                Avg averageAge = elasticBucket.getAggregations().get("average_age"); 
                double avg = averageAge.getValue();*/
                
            }
            
        } catch (IOException | InterruptedException | ExecutionException e) {
            logger.error(e);
        }
    }
}
复制代码

 9. 官网文档

Document API 文档操作API:

https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.2/java-docs.html

Search API:

https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.2/java-search.html

 六、Spring Data Elasticsearch

 ES与Spring集成使用,可以作为了解,个人建议还是使用原生的ES的java客户端

 官网链接:

https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/

代码库:

https://github.com/spring-projects/spring-data-elasticsearch

七、源代码获取地址

https://github.com/leeSmall/Elasticsearch-Java-client-api

转自:https://www.cnblogs.com/leeSmall/p/9218779.html

 

注:想学习es相关知识,请到https://www.cnblogs.com/leeSmall/p/9218779.html连接下查找详细信息,该博客写的很好

今天的关于使用Elastic Search 5.5.0以获得最佳性能时,如何正确关闭Raw RestClient?关闭elasticsearch进程的分享已经结束,谢谢您的关注,如果想了解更多关于Elastic Search RestClient 如何保持长链接?、ElasticSearch - 学习笔记 02-springboot 整合 jestclient 操作 elasticSearch、Elasticsearch High Level REST Client、Elasticsearch Java client(ES Client 简介、Java REST Client、Java Client、Spring Data Elasticsearch)的相关知识,请在本站进行查询。

本文标签: