elasticsearch初体验
产品那边想要做个文章搜索功能,考虑到接口性能的问题,我决定使用elasticsearch
来实现搜索功能。
0)实现思路
首先通过http接口从内容服务接口中获得开放的栏目列表,挨个进行遍历。
然后根据栏目名称,从对应的MySQL数据表中时间升序查询一定量的数据,用Redis记录最后一条的no
接下来把查出的数据挨个简单处理再通过es的接口存入存入es中。
过几秒再重复这个过程,但是查询数据的时候,就带上no了,往次取过的数据就不要取了。
就这么干吧!下面是全部代码。
1)用docker启动es实例
先用docker起一个es实例吧!itmx/dzkd-content-es:6.5.4
镜像是仅在官方基础之上安装了analysis-ik
中文分词插件
version: '2.2'
services:
elasticsearch:
image: itmx/dzkd-content-es:6.5.4
restart: always
environment:
- cluster.name=content-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- esdata1:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- esnet
redis:
image: daocloud.io/library/redis:5.0
volumes:
- ./redis-data:/data
restart: always
ports:
- "6379:6379"
networks:
- esnet
volumes:
esdata1:
networks:
esnet:
2)编写同步数据的java程序
我的数据库里面有大量的文章,这些文章需要同步到es实例中,在这里我准备用elasticsearch-rest-high-level-client
工具来往es里面存入数据
1.pom.xml
文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cc.itmx.util</groupId>
<artifactId>itmx-content-transfer-to-es</artifactId>
<version>1.0.0</version>
<name>itmx-content-transfer-to-es</name>
<description>本程序用于将MySQL中的内容数据同步到es数据库中</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.1.2.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.5.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.12.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
2.编写配置文件MyConfig
package cc.itmx.util.common;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @author ITMX
*/
@Data
@Component
@ConfigurationProperties(prefix = "my-config")
public class MyConfig {
private String esHost;
private Integer esPort;
private String esScheme;
private String esUser;
private String esPasswd;
private String mysqlUri;
private String mysqlUsername;
private String mysqlPassword;
private String contentHost;
private String mysqlNewsDb;
private String mysqlVideoDb;
private Integer transferLimit;
}
对应的application.yml
文件
my-config:
es-host: 192.168.1.251
es-port: 19200
es-scheme: http
mysql-uri: jdbc:mysql://192.168.1.251:3306
mysql-username: root
mysql-password: ******
mysql-news-db: news-db
mysql-video-db: video-db
content-host: http://192.168.1.251:81
transfer-limit: 1000
spring:
redis:
host: 192.168.1.251
port: 6379
database: 1
password:
3.抽象公共父类AbstractTransfer
,复用方法
package cc.itmx.util.common;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.StringUtils;
import java.sql.Connection;
import java.sql.*;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
/**
* @author ITMX
*/
@Slf4j
public abstract class AbstractTransfer {
private final OkHttpClient HTTP_CLIENT = new OkHttpClient();
private Set<String> tableList = new HashSet<>();
private final RedisTemplate<String, String> redisTemplate;
protected final MyConfig myConfig;
private final RestHighLevelClient client;
private final CredentialsProvider credentialsProvider;
protected AbstractTransfer(RedisTemplate<String, String> redisTemplate, MyConfig myConfig, RestHighLevelClient client) {
this.redisTemplate = redisTemplate;
this.myConfig = myConfig;
credentialsProvider = !StringUtils.isEmpty(myConfig.getEsUser()) && !StringUtils.isEmpty(myConfig.getEsPasswd())? new BasicCredentialsProvider():null;
if(credentialsProvider != null) {
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(myConfig.getEsUser(), myConfig.getEsPasswd()));
}
this.client = client == null? new RestHighLevelClient(
RestClient.builder(
new HttpHost(myConfig.getEsHost(),
myConfig.getEsPort(),
myConfig.getEsScheme())).setHttpClientConfigCallback(httpAsyncClientBuilder -> {
if(credentialsProvider != null){
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
return httpAsyncClientBuilder;
})): client;
configMappings(String.format("%s://%s:%d/content", myConfig.getEsScheme(), myConfig.getEsHost(), myConfig.getEsPort()));
}
/**
* 处理MySQL结果集
* @return
* @param rsmd
* @param colCount
* @param rs
* @param tableName
*/
public abstract Map<String, String> handelResultSet(ResultSetMetaData rsmd, int colCount, ResultSet rs, String tableName);
private LinkedList<Map<String, String>> fetchData(String tableName) {
LinkedList<Map<String, String>> list = new LinkedList<>();
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
log.error("缺少mysql驱动:{}", e.getMessage());
throw new RuntimeException("缺少mysql驱动");
}
Connection conn;
try {
conn = DriverManager.getConnection(myConfig.getMysqlUri(), myConfig.getMysqlUsername(), myConfig.getMysqlPassword());
log.info("成功连接MySQL,当前数据表:{}", tableName);
} catch (SQLException e) {
log.error("数据库连接异常:{}", e.getMessage(), e);
return list;
}
String lastNo = redisTemplate.opsForValue().get(this.getClass().getSimpleName() + "lastNo:" + tableName);
String timeWhere = StringUtils.isEmpty(lastNo) ? "" : String.format(" AND `no` > %s ", lastNo);
PreparedStatement ps;
try {
ps = conn.prepareStatement(getSql(tableName,timeWhere), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(Integer.MIN_VALUE);
} catch (SQLException e) {
log.error("SQL 语句预编译失败:{}", e.getMessage(), e);
try {
conn.close();
} catch (SQLException ignore) {
}
return list;
}
ResultSet rs;
try {
rs = ps.executeQuery();
} catch (SQLException e) {
log.error("执行SQL查询出现异常:{}", e.getMessage(), e);
try {
ps.close();
} catch (SQLException ignore) {
}
return list;
}
ResultSetMetaData rsmd;
int colCount;
try {
rsmd = rs.getMetaData();
colCount = rsmd.getColumnCount();
} catch (SQLException e) {
log.error("从SQL结果中获取列信息失败:{}", e.getMessage(), e);
try {
rs.close();
} catch (SQLException ignore) {
}
try {
ps.close();
} catch (SQLException ignore) {
}
try {
conn.close();
} catch (SQLException ignore) {
}
return list;
}
try {
Map<String, String> obj;
while (rs.next()) {
obj = handelResultSet(rsmd,colCount,rs,tableName);
if(obj == null){
continue;
}
list.add(obj);
}
} catch (Exception e) {
log.error("读取数据出现异常:", e);
return list;
} finally {
try {
rs.close();
} catch (SQLException ignore) {
}
try {
ps.close();
} catch (SQLException ignore) {
}
try {
conn.close();
} catch (SQLException ignore) {
}
}
log.info("获取[{}]表中的数据完毕,本次传准备输数据量:{}", tableName, list.size());
return list;
}
protected abstract String getSql(String tableName, String timeWhere);
private void insertToEs(String table, LinkedList<Map<String, String>> list) {
BulkRequest request = new BulkRequest();
request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");
String lastNo = list.getLast().get("no");
list.forEach(o -> {
String id = o.get("id");
o.remove("id");
o.remove("no");
request.add(new IndexRequest("content", "default", id).source(o));
});
BulkResponse bulkResponse;
try {
bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
} catch (Exception e) {
log.error("批量提交插入请求失败:", e);
return;
}
if (bulkResponse.hasFailures()) {
Stream.of(bulkResponse.getItems())
.filter(BulkItemResponse::isFailed)
.forEach(item -> log.error("插入es失败:{}", item.getFailureMessage()));
} else {
log.info("全部插入成功,数量:{}", list.size());
redisTemplate.opsForValue().set(this.getClass().getSimpleName() + "lastNo:" + table, lastNo);
log.info("更新编号条件成功:{}=>{}", table, lastNo);
}
}
protected void fetchCategoryList(String url){
Request request = new Request.Builder()
.url(myConfig.getContentHost() + url)
.build();
try (Response response = HTTP_CLIENT.newCall(request).execute()) {
if (response.code() != HttpStatus.SC_OK) {
response.close();
return;
}
ResponseBody body = response.body();
if (body == null) {
response.close();
return;
}
String result = body.string();
if (StringUtils.isEmpty(result)) {
response.close();
return;
}
response.close();
JSONObject jsonObject = JSON.parseObject(result);
JSONArray list = jsonObject.getJSONArray("list");
tableList.addAll(list.toJavaList(String.class));
log.info("刷新栏目列表完毕:{}", tableList);
} catch (Exception e) {
log.error("刷新栏目列表异常。", e);
}
}
/**
* 配置分词规则
*/
private void configMappings(String url) {
String postJson = "{\"settings\":{\"number_of_replicas\":0,\"number_of_shards\":5},\"mappings\":{\"default\":{\"properties\":{\"title\":{\"type\":\"text\",\"analyzer\":\"ik_max_word\",\"search_analyzer\":\"ik_max_word\"},\"content\":{\"type\":\"text\",\"analyzer\":\"ik_max_word\",\"search_analyzer\":\"ik_max_word\"},\"sourceName\":{\"type\":\"text\",\"analyzer\":\"ik_max_word\",\"search_analyzer\":\"ik_max_word\"},\"img\":{\"type\": \"text\",\"index\":false},\"category\":{\"type\": \"text\",\"index\":false},\"type\":{\"type\": \"text\",\"index\":false},\"createTime\":{\"type\": \"date\",\"format\":\"yyy-MM-dd HH:mm:ss\"}}}}}";
RequestBody body = RequestBody.create(MediaType.get("application/json; charset=utf-8"), postJson);
Request request = new Request.Builder()
.url(url)
.put(body)
.build();
try (Response response = HTTP_CLIENT.newCall(request).execute()) {
response.close();
if (response.code() == HttpStatus.SC_OK) {
log.info("配置默认分词字段成功");
}else{
log.warn("配置默认分词字段失败或已配置");
}
} catch (Exception e) {
log.error("配置默认分词字段失败", e);
throw new RuntimeException("配置默认分词字段失败");
}
}
protected void run() {
log.info("准备数据传输...");
tableList.forEach(table -> {
LinkedList<Map<String, String>> list = fetchData(table);
if (list.isEmpty()) {
return;
}
insertToEs(table, list);
});
}
/**
* 在子类中定时调用,此为入口
*/
protected abstract void main();
/**
* 在子类中定时调用,获取栏目列表
*/
protected abstract void fetchCategoryList();
}
4.文章内容同步实现类
package cc.itmx.util.biz;
import cc.itmx.util.common.AbstractTransfer;
import cc.itmx.util.common.MyConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.HashMap;
import java.util.Map;
/**
* @author ITMX
*/
@Slf4j
@Component
public class TransferNewsContent extends AbstractTransfer {
@Autowired
public TransferNewsContent(RedisTemplate<String, String> redisTemplate, MyConfig myConfig) {
super(redisTemplate,myConfig,null);
}
@Override
public Map<String, String> handelResultSet(ResultSetMetaData rsmd, int colCount, ResultSet rs, String tableName) {
Map<String, String> obj = new HashMap<>(colCount + 3);
try {
for (int i = 1; i <= colCount; i++) {
obj.put(rsmd.getColumnName(i), rs.getString(i));
}
obj.put("img", obj.get("images").split(";",2)[0]);
obj.remove("images");
obj.put("type","news");
obj.put("category",tableName);
obj.put("url", "");
} catch (Exception e) {
log.error("解析MySQL结果集异常:", e);
return null;
}
return obj;
}
@Override
protected String getSql(String tableName, String timeWhere) {
return String.format("SELECT c.`id`,c.`title`,c.`sourceName`,c.`content`,c.`createTime`,l.`images`,l.`no` FROM `%s`.`%s` l INNER JOIN `%s`.`%s_content` c ON l.id = c.id AND l.del_flag = 0 %s ORDER BY l.`no` ASC LIMIT %d;", myConfig.getMysqlNewsDb(), tableName, myConfig.getMysqlNewsDb(), tableName, timeWhere, myConfig.getTransferLimit());
}
@Override
@Scheduled(fixedDelay = 10_000)
public void fetchCategoryList() {
fetchCategoryList("/list/news");
}
@Override
@Scheduled(fixedDelay = 4_000)
public void main() {
super.run();
}
}
5.视频内容同步实现类
package cc.itmx.util.biz;
import cc.itmx.util.common.AbstractTransfer;
import cc.itmx.util.common.MyConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.HashMap;
import java.util.Map;
/**
* @author ITMX
*/
@Slf4j
@Component
public class TransferVideoList extends AbstractTransfer {
@Autowired
public TransferVideoList(RedisTemplate<String, String> redisTemplate, MyConfig myConfig) {
super(redisTemplate,myConfig, null);
}
@Override
public Map<String, String> handelResultSet(ResultSetMetaData rsmd, int colCount, ResultSet rs, String tableName) {
Map<String, String> obj = new HashMap<>(colCount + 3);
try {
for (int i = 1; i <= colCount; i++) {
obj.put(rsmd.getColumnName(i), rs.getString(i));
}
obj.put("img", obj.get("thumbUrl"));
obj.remove("thumbUrl");
obj.put("sourceName", obj.get("source"));
obj.remove("source");
if("小视频".equals(tableName)) {
obj.put("type", "wuli");
}else{
obj.put("type", "video");
}
obj.put("category",tableName);
} catch (Exception e) {
log.error("解析MySQL结果集异常:", e);
return null;
}
return obj;
}
@Override
protected String getSql(String tableName, String timeWhere) {
return String.format("SELECT `id`, `title`, `thumbUrl`, `url`, `source`, `createTime`, `no` FROM `%s`.`%s` WHERE `del_flag` = 0 %s ORDER BY `no` ASC LIMIT %d;", myConfig.getMysqlVideoDb(),tableName, timeWhere, myConfig.getTransferLimit());
}
@Override
@Scheduled(fixedDelay = 15_000)
public void fetchCategoryList() {
fetchCategoryList("/list/video");
}
@Override
@Scheduled(fixedDelay = 6_000)
public void main() {
super.run();
}
}
6.启动类ContentTransferToEsApplication
package cc.itmx.util;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* @author ITMX
*/
@EnableScheduling
@SpringBootApplication
public class ContentTransferToEsApplication {
public static void main(String[] args) {
SpringApplication.run(ContentTransferToEsApplication.class, args);
}
}
3.使用http请求es接口搜索文章
一旦有数据存入我们的es中,那我们就可以通过http调用es的接口搜索内容了
post查询提交的JSON数据:
{
query: {
query_string: {
query: "中国",
fields: ["title"],
minimum_should_match: "80%"
}
},
from: 0,
size: 10,
sort: [{createTime: {unmapped_type: "date",order: "desc"}}],
//获取第二页可加此项 search_after: ['上一页最后一条的时间戳']
}
curl -X POST -d '上面的JSON数据' http://192.168.1.251:9200/_search
4.使用LUA调用搜索接口
略,与项目耦合较高,缺少上下文不容易理解,暂不贴出
参考链接:
http://www.ruanyifeng.com/blog/2017/08/elasticsearch.html
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.5/index.html
https://www.yiibai.com/elasticsearch/elasticsearch_modules.html