内容纲要
Java API 操作
随着 Elasticsearch 8.x 新版本的到来,Type 的概念被废除,为了适应这种数据结构的改变,Elasticsearch 官方从 7.15 版本开始建议使用新的 Elasticsearch Java Client。
1、增加依赖关系
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<elastic.version>8.1.0</elastic.version>
</properties>
<dependencies>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>x-pack-sql-jdbc</artifactId>
<version>8.1.0</version>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elastic.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>
2、获取客户端对象
就像连接 MySQL 数据库一样,Java 通过客户端操作 Elasticsearch 也要获取到连接后才可以。
现在使用的基于 https 安全的 Elasticsearch 服务,所以首先我们需要将之前的证书进行一个转换
openssl pkcs12 -in elastic-stack-ca.p12 -clcerts -nokeys -out java-ca.crt
配置证书后,我们就可以采用 https 方式获取连接对象了。
# 导入的类
import co.elastic.clients.elasticsearch.*;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.*;
import org.apache.http.client.*;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.*;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.*;
import org.elasticsearch.client.*;
import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.*;
import java.security.KeyStore;
import java.security.cert.*;
public class ElasticSearchUtil {
public static operate() {
# 获取客户端对象
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "O3x0hfu7i=ZbQvlktCnd"));
Path caCertificatePath = Paths.get("ca.crt");
CertificateFactory factory =
CertificateFactory.getInstance("X.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(caCertificatePath)) {
trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null, null);
trustStore.setCertificateEntry("ca", trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore, null);
final SSLContext sslContext = sslContextBuilder.build();
RestClientBuilder builder = RestClient.builder(
new HttpHost("linux1", 9200, "https"))
.setHttpClientConfigCallback(new
RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext)
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setDefaultCredentialsProvider(credentialsProvider);
}
});
RestClient restClient = builder.build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient(transport);
...
transport.close();
}
}
3、操作数据(普通操作)
3.1 索引操作
// 创建索引
CreateIndexRequest request = new
CreateIndexRequest.Builder().index("myindex").build();
final CreateIndexResponse createIndexResponse =
client.indices().create(request);
System.out.println("创建索引成功:" + createIndexResponse.acknowledged());
// 查询索引
GetIndexRequest getIndexRequest = new
GetIndexRequest.Builder().index("myindex").build();
final GetIndexResponse getIndexResponse =
client.indices().get(getIndexRequest);
System.out.println("索引查询成功:" + getIndexResponse.result());
// 删除索引
DeleteIndexRequest deleteIndexRequest = new
DeleteIndexRequest.Builder().index("myindex").build();
final DeleteIndexResponse delete = client.indices().delete(deleteIndexRequest);
final boolean acknowledged = delete.acknowledged();
System.out.println("删除索引成功:" + acknowledged);
3.2 文档操作
// 创建文档
IndexRequest indexRequest = new IndexRequest.Builder()
.index("myindex")
.id(user.getId().toString())
.document(user)
.build();
final IndexResponse index = client.index(indexRequest);
System.out.println("文档操作结果:" + index.result());
// 批量创建文档
final List<BulkOperation> operations = new ArrayList<BulkOperation>();
for ( int i= 1;i <= 5; i++ ) {
final CreateOperation.Builder builder = new CreateOperation.Builder();
builder.index("myindex");
builder.id("200" + i);
builder.document(new User(2000 + i, 30 + i * 10, "zhangsan" + i, "beijing",
1000 + i*1000));
final CreateOperation<Object> objectCreateOperation = builder.build();
final BulkOperation bulk = new BulkOperation.Builder().create(objectCreateOperation).build();
operations.add(bulk);
}
BulkRequest bulkRequest = new BulkRequest.Builder().operations(operations).build();
final BulkResponse bulkResponse = client.bulk(bulkRequest);
System.out.println("数据操作成功:" + bulkResponse);
// 删除文档
DeleteRequest deleteRequest = new DeleteRequest.Builder().index("myindex").id("1001").build();
client.delete(deleteRequest);
3、文档查询
final SearchRequest.Builder searchRequestBuilder = new SearchRequest.Builder().index("myindex1");
MatchQuery matchQuery = new MatchQuery.Builder().field("city").query(FieldValue.of("beijing")).build();
Query query = new Query.Builder().match(matchQuery).build();
searchRequestBuilder.query(query);
SearchRequest searchRequest = searchRequestBuilder.build();
final SearchResponse<Object> search = client.search(searchRequest, Object.class);
System.out.println(search);
操作数据(函数操作)
1、索引操作
// 创建索引
final Boolean acknowledged = client.indices().create(p -> p.index("")).acknowledged();
System.out.println("创建索引成功");
// 获取索引
System.out.println(
client.indices().get(
req -> req.index("myindex1")
).result());
// 删除索引
client.indices().delete(
reqbuilder -> reqbuilder.index("myindex")
).acknowledged();
2、文档操作
// 创建文档
System.out.println(
client.index(
req ->
req.index("myindex")
.id(user.getId().toString())
.document(user)
).result()
);
// 批量创建文档
client.bulk(
req -> {
users.forEach(
u -> {
req.operations(
b -> {
b.create(
d -> d.id(u.getId().toString()).index("myindex").document(u)
);
return b;
}
);
}
);
return req;
}
);
// 删除文档
client.delete(
req -> req.index("myindex").id("1001")
);
3、文档查询
client.search(
req -> {
req.query(
q ->
q.match(
m -> m.field("city").query("beijing")
)
);
return req;
}
, Object.class
);
4、客户端异步操作
ES Java API 提供了同步和异步的两种客户端处理。之前演示的都是同步处理,异步客户端的处理和同步客户端处理的 API 基本原理相同,不同的是需要异步对返回结果进行相应的处理。
// 创建索引
asyncClient.indices().create(
req -> {
req.index("newindex");
return req;
}
).whenComplete(
(resp, error) -> {
System.out.println("回调函数");
if ( resp != null ) {
System.out.println(resp.acknowledged());
} else {
error.printStackTrace();
}
}
);
System.out.println("主线程操作...");
asyncClient.indices().create(
req -> {
req.index("newindex");
return req;
}
)
.thenApply(
resp -> {
return resp.acknowledged();
}
)
.whenComplete(
(resp, error) -> {
System.out.println("回调函数");
if ( !resp ) {
System.out.println();
} else {
error.printStackTrace();
}
}
);