ElasticSearch 8.x – Java API 操作

内容纲要

Java API 操作

随着 Elasticsearch 8.x 新版本的到来,Type 的概念被废除,为了适应这种数据结构的改变,Elasticsearch 官方从 7.15 版本开始建议使用新的 Elasticsearch Java Client。

1、增加依赖关系

<properties>
 <maven.compiler.source>8</maven.compiler.source>
 <maven.compiler.target>8</maven.compiler.target>
 <elastic.version>8.1.0</elastic.version>
</properties>
<dependencies>
     <dependency>
         <groupId>org.elasticsearch.plugin</groupId>
         <artifactId>x-pack-sql-jdbc</artifactId>
         <version>8.1.0</version>
     </dependency>
     <dependency>
         <groupId>co.elastic.clients</groupId>
         <artifactId>elasticsearch-java</artifactId>
         <version>${elastic.version}</version>
     </dependency>
     <dependency>
         <groupId>com.fasterxml.jackson.core</groupId>
         <artifactId>jackson-databind</artifactId>
         <version>2.12.3</version>
     </dependency>
     <dependency>
         <groupId>jakarta.json</groupId>
         <artifactId>jakarta.json-api</artifactId>
         <version>2.0.1</version>
    </dependency>
</dependencies>

2、获取客户端对象

就像连接 MySQL 数据库一样,Java 通过客户端操作 Elasticsearch 也要获取到连接后才可以。

现在使用的基于 https 安全的 Elasticsearch 服务,所以首先我们需要将之前的证书进行一个转换

openssl pkcs12 -in elastic-stack-ca.p12 -clcerts -nokeys -out java-ca.crt

配置证书后,我们就可以采用 https 方式获取连接对象了。

# 导入的类
import co.elastic.clients.elasticsearch.*;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.*;
import org.apache.http.client.*;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.*;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.*;
import org.elasticsearch.client.*;
import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.*;
import java.security.KeyStore;
import java.security.cert.*;

public class ElasticSearchUtil {
    public static operate() {
        # 获取客户端对象
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
         new UsernamePasswordCredentials("elastic", "O3x0hfu7i=ZbQvlktCnd"));
        Path caCertificatePath = Paths.get("ca.crt");
        CertificateFactory factory =
         CertificateFactory.getInstance("X.509");
        Certificate trustedCa;
        try (InputStream is = Files.newInputStream(caCertificatePath)) {
         trustedCa = factory.generateCertificate(is);
        }
        KeyStore trustStore = KeyStore.getInstance("pkcs12");
        trustStore.load(null, null);
        trustStore.setCertificateEntry("ca", trustedCa);
        SSLContextBuilder sslContextBuilder = SSLContexts.custom()
         .loadTrustMaterial(trustStore, null);
        final SSLContext sslContext = sslContextBuilder.build();
        RestClientBuilder builder = RestClient.builder(
         new HttpHost("linux1", 9200, "https"))
         .setHttpClientConfigCallback(new 
        RestClientBuilder.HttpClientConfigCallback() {
         @Override
         public HttpAsyncClientBuilder customizeHttpClient(
         HttpAsyncClientBuilder httpClientBuilder) {
         return httpClientBuilder.setSSLContext(sslContext)
         .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
         .setDefaultCredentialsProvider(credentialsProvider);
         }
         });
        RestClient restClient = builder.build();
        ElasticsearchTransport transport = new RestClientTransport(
         restClient, new JacksonJsonpMapper());
        ElasticsearchClient client = new ElasticsearchClient(transport);
        ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient(transport);
        ...
        transport.close();
    }
}

3、操作数据(普通操作)

3.1 索引操作

// 创建索引
CreateIndexRequest request = new 
CreateIndexRequest.Builder().index("myindex").build();
final CreateIndexResponse createIndexResponse = 
client.indices().create(request);
System.out.println("创建索引成功:" + createIndexResponse.acknowledged());
// 查询索引
GetIndexRequest getIndexRequest = new 
GetIndexRequest.Builder().index("myindex").build();
final GetIndexResponse getIndexResponse = 
client.indices().get(getIndexRequest);
System.out.println("索引查询成功:" + getIndexResponse.result());
// 删除索引
DeleteIndexRequest deleteIndexRequest = new 
DeleteIndexRequest.Builder().index("myindex").build();
final DeleteIndexResponse delete = client.indices().delete(deleteIndexRequest);
final boolean acknowledged = delete.acknowledged();
System.out.println("删除索引成功:" + acknowledged);

3.2 文档操作

// 创建文档
IndexRequest indexRequest = new IndexRequest.Builder()
 .index("myindex")
 .id(user.getId().toString())
 .document(user)
 .build();
final IndexResponse index = client.index(indexRequest);
System.out.println("文档操作结果:" + index.result());
// 批量创建文档
final List<BulkOperation> operations = new ArrayList<BulkOperation>();
for ( int i= 1;i <= 5; i++ ) {
     final CreateOperation.Builder builder = new  CreateOperation.Builder();
     builder.index("myindex");
     builder.id("200" + i);
     builder.document(new User(2000 + i, 30 + i * 10, "zhangsan" + i, "beijing", 
    1000 + i*1000));
     final CreateOperation<Object> objectCreateOperation = builder.build();
     final BulkOperation bulk = new  BulkOperation.Builder().create(objectCreateOperation).build();
     operations.add(bulk);
}
BulkRequest bulkRequest = new  BulkRequest.Builder().operations(operations).build();
final BulkResponse bulkResponse = client.bulk(bulkRequest);
System.out.println("数据操作成功:" + bulkResponse);
// 删除文档
DeleteRequest deleteRequest = new DeleteRequest.Builder().index("myindex").id("1001").build();
client.delete(deleteRequest);

3、文档查询

final SearchRequest.Builder searchRequestBuilder = new  SearchRequest.Builder().index("myindex1");
MatchQuery matchQuery = new MatchQuery.Builder().field("city").query(FieldValue.of("beijing")).build();
Query query = new Query.Builder().match(matchQuery).build();
searchRequestBuilder.query(query);
SearchRequest searchRequest = searchRequestBuilder.build();
final SearchResponse<Object> search = client.search(searchRequest, Object.class);
System.out.println(search);

操作数据(函数操作)

1、索引操作

// 创建索引
final Boolean acknowledged = client.indices().create(p -> p.index("")).acknowledged();
System.out.println("创建索引成功");
// 获取索引
System.out.println(
 client.indices().get(
 req -> req.index("myindex1")
).result());
// 删除索引
client.indices().delete(
 reqbuilder -> reqbuilder.index("myindex")
).acknowledged();

2、文档操作

// 创建文档
System.out.println(
    client.index(
        req ->
            req.index("myindex")
            .id(user.getId().toString())
            .document(user)
    ).result()
);
// 批量创建文档
client.bulk(
    req -> {
        users.forEach(
            u -> {
                req.operations(
                    b -> {
                        b.create(
                            d ->  d.id(u.getId().toString()).index("myindex").document(u)
                        );
                         return b;
                     }
                 );
             }
         );
     return req;
    }
);
// 删除文档
client.delete(
    req -> req.index("myindex").id("1001")
);

3、文档查询

client.search(
    req -> {
        req.query(
            q ->
                q.match(
                    m -> m.field("city").query("beijing")
                )
            );
        return req;
    }
    , Object.class
);

4、客户端异步操作

ES Java API 提供了同步和异步的两种客户端处理。之前演示的都是同步处理,异步客户端的处理和同步客户端处理的 API 基本原理相同,不同的是需要异步对返回结果进行相应的处理。

// 创建索引
asyncClient.indices().create(
    req -> {
        req.index("newindex");
        return req;
    }
).whenComplete(
    (resp, error) -> {
        System.out.println("回调函数");
        if ( resp != null ) {
            System.out.println(resp.acknowledged());
        } else {
           error.printStackTrace();
        }
    }
);
System.out.println("主线程操作...");
asyncClient.indices().create(
    req -> {
        req.index("newindex");
        return req;
    }
)
.thenApply(
    resp -> {
        return resp.acknowledged();
    }
)
.whenComplete(
    (resp, error) -> {
        System.out.println("回调函数");
        if ( !resp ) {
            System.out.println();
        } else {
            error.printStackTrace();
        }
    }
);

Leave a Comment

您的电子邮箱地址不会被公开。 必填项已用*标注

close
arrow_upward