elasticsearch初体验

产品那边想要做个文章搜索功能,考虑到接口性能的问题,我决定使用elasticsearch来实现搜索功能。

0)实现思路

首先通过http接口从内容服务接口中获得开放的栏目列表,挨个进行遍历。
然后根据栏目名称,从对应的MySQL数据表中时间升序查询一定量的数据,用Redis记录最后一条的no
接下来把查出的数据挨个简单处理再通过es的接口存入存入es中。
过几秒再重复这个过程,但是查询数据的时候,就带上no了,往次取过的数据就不要取了。
就这么干吧!下面是全部代码。

1)用docker启动es实例

先用docker起一个es实例吧!itmx/dzkd-content-es:6.5.4镜像是仅在官方基础之上安装了analysis-ik中文分词插件

version: '2.2'
services:
  elasticsearch:
    image: itmx/dzkd-content-es:6.5.4
    restart: always
    environment:
      - cluster.name=content-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - esdata1:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
    networks:
      - esnet

  redis:
    image: daocloud.io/library/redis:5.0
    volumes:
      - ./redis-data:/data
    restart: always
    ports:
      - "6379:6379"
    networks:
      - esnet

volumes:
  esdata1:

networks:
  esnet:

2)编写同步数据的java程序

我的数据库里面有大量的文章,这些文章需要同步到es实例中,在这里我准备用elasticsearch-rest-high-level-client工具来往es里面存入数据

1.pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cc.itmx.util</groupId>
    <artifactId>itmx-content-transfer-to-es</artifactId>
    <version>1.0.0</version>
    <name>itmx-content-transfer-to-es</name>
    <description>本程序用于将MySQL中的内容数据同步到es数据库中</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>2.1.2.RELEASE</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.5.4</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>3.12.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

2.编写配置文件MyConfig

package cc.itmx.util.common;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * @author ITMX
 */
@Data
@Component
@ConfigurationProperties(prefix = "my-config")
public class MyConfig {
    private String esHost;
    private Integer esPort;
    private String esScheme;
    private String esUser;
    private String esPasswd;

    private String mysqlUri;

    private String mysqlUsername;
    private String mysqlPassword;

    private String contentHost;

    private String mysqlNewsDb;
    private String mysqlVideoDb;

    private Integer transferLimit;
}

对应的application.yml文件

my-config:
  es-host: 192.168.1.251
  es-port: 19200
  es-scheme: http
  mysql-uri: jdbc:mysql://192.168.1.251:3306
  mysql-username: root
  mysql-password: ******
  mysql-news-db: news-db
  mysql-video-db: video-db
  content-host: http://192.168.1.251:81
  transfer-limit: 1000
spring:
  redis:
    host: 192.168.1.251
    port: 6379
    database: 1
    password:

3.抽象公共父类AbstractTransfer,复用方法

package cc.itmx.util.common;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.StringUtils;

import java.sql.Connection;
import java.sql.*;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

/**
 * @author ITMX
 */
@Slf4j
public abstract class AbstractTransfer {

    private final OkHttpClient HTTP_CLIENT = new OkHttpClient();

    private Set<String> tableList = new HashSet<>();

    private final RedisTemplate<String, String> redisTemplate;
    protected final MyConfig myConfig;
    private final RestHighLevelClient client;
    private final CredentialsProvider credentialsProvider;

    protected AbstractTransfer(RedisTemplate<String, String> redisTemplate, MyConfig myConfig, RestHighLevelClient client) {
        this.redisTemplate = redisTemplate;
        this.myConfig = myConfig;

        credentialsProvider = !StringUtils.isEmpty(myConfig.getEsUser()) && !StringUtils.isEmpty(myConfig.getEsPasswd())? new BasicCredentialsProvider():null;
        if(credentialsProvider != null) {
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(myConfig.getEsUser(), myConfig.getEsPasswd()));
        }

        this.client = client == null? new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost(myConfig.getEsHost(),
                                myConfig.getEsPort(),
                                myConfig.getEsScheme())).setHttpClientConfigCallback(httpAsyncClientBuilder -> {
                            if(credentialsProvider != null){
                                httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                            }
                            return httpAsyncClientBuilder;
                })): client;

        configMappings(String.format("%s://%s:%d/content", myConfig.getEsScheme(), myConfig.getEsHost(), myConfig.getEsPort()));
    }

    /**
     * 处理MySQL结果集
     * @return
     * @param rsmd
     * @param colCount
     * @param rs
     * @param tableName
     */
    public abstract Map<String, String> handelResultSet(ResultSetMetaData rsmd, int colCount, ResultSet rs, String tableName);

    private LinkedList<Map<String, String>> fetchData(String tableName) {
        LinkedList<Map<String, String>> list = new LinkedList<>();
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            log.error("缺少mysql驱动:{}", e.getMessage());
            throw new RuntimeException("缺少mysql驱动");
        }

        Connection conn;
        try {
            conn = DriverManager.getConnection(myConfig.getMysqlUri(), myConfig.getMysqlUsername(), myConfig.getMysqlPassword());
            log.info("成功连接MySQL,当前数据表:{}", tableName);
        } catch (SQLException e) {
            log.error("数据库连接异常:{}", e.getMessage(), e);
            return list;
        }

        String lastNo = redisTemplate.opsForValue().get(this.getClass().getSimpleName() + "lastNo:" + tableName);
        String timeWhere = StringUtils.isEmpty(lastNo) ? "" : String.format(" AND `no` > %s ", lastNo);

        PreparedStatement ps;
        try {
            ps = conn.prepareStatement(getSql(tableName,timeWhere), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            ps.setFetchSize(Integer.MIN_VALUE);
        } catch (SQLException e) {
            log.error("SQL 语句预编译失败:{}", e.getMessage(), e);
            try {
                conn.close();
            } catch (SQLException ignore) {
            }
            return list;
        }

        ResultSet rs;
        try {
            rs = ps.executeQuery();
        } catch (SQLException e) {
            log.error("执行SQL查询出现异常:{}", e.getMessage(), e);
            try {
                ps.close();
            } catch (SQLException ignore) {
            }
            return list;
        }

        ResultSetMetaData rsmd;
        int colCount;
        try {
            rsmd = rs.getMetaData();
            colCount = rsmd.getColumnCount();
        } catch (SQLException e) {
            log.error("从SQL结果中获取列信息失败:{}", e.getMessage(), e);
            try {
                rs.close();
            } catch (SQLException ignore) {
            }
            try {
                ps.close();
            } catch (SQLException ignore) {
            }
            try {
                conn.close();
            } catch (SQLException ignore) {
            }
            return list;
        }

        try {
            Map<String, String> obj;
            while (rs.next()) {
                obj = handelResultSet(rsmd,colCount,rs,tableName);
                if(obj == null){
                    continue;
                }
                list.add(obj);
            }
        } catch (Exception e) {
            log.error("读取数据出现异常:", e);
            return list;
        } finally {
            try {
                rs.close();
            } catch (SQLException ignore) {
            }
            try {
                ps.close();
            } catch (SQLException ignore) {
            }
            try {
                conn.close();
            } catch (SQLException ignore) {
            }
        }

        log.info("获取[{}]表中的数据完毕,本次传准备输数据量:{}", tableName, list.size());
        return list;
    }

    protected abstract String getSql(String tableName, String timeWhere);

    private void insertToEs(String table, LinkedList<Map<String, String>> list) {
        BulkRequest request = new BulkRequest();
        request.timeout(TimeValue.timeValueMinutes(2));
        request.timeout("2m");

        String lastNo = list.getLast().get("no");

        list.forEach(o -> {
            String id = o.get("id");
            o.remove("id");
            o.remove("no");
            request.add(new IndexRequest("content", "default", id).source(o));
        });

        BulkResponse bulkResponse;
        try {
            bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
            log.error("批量提交插入请求失败:", e);
            return;
        }

        if (bulkResponse.hasFailures()) {
            Stream.of(bulkResponse.getItems())
                    .filter(BulkItemResponse::isFailed)
                    .forEach(item -> log.error("插入es失败:{}", item.getFailureMessage()));
        } else {
            log.info("全部插入成功,数量:{}", list.size());

            redisTemplate.opsForValue().set(this.getClass().getSimpleName() + "lastNo:" + table, lastNo);
            log.info("更新编号条件成功:{}=>{}", table, lastNo);
        }
    }

    protected void fetchCategoryList(String url){
        Request request = new Request.Builder()
                .url(myConfig.getContentHost() + url)
                .build();
        try (Response response = HTTP_CLIENT.newCall(request).execute()) {
            if (response.code() != HttpStatus.SC_OK) {
                response.close();
                return;
            }
            ResponseBody body = response.body();
            if (body == null) {
                response.close();
                return;
            }
            String result = body.string();
            if (StringUtils.isEmpty(result)) {
                response.close();
                return;
            }
            response.close();
            JSONObject jsonObject = JSON.parseObject(result);
            JSONArray list = jsonObject.getJSONArray("list");
            tableList.addAll(list.toJavaList(String.class));
            log.info("刷新栏目列表完毕:{}", tableList);
        } catch (Exception e) {
            log.error("刷新栏目列表异常。", e);
        }
    }

    /**
     * 配置分词规则
     */
    private void configMappings(String url) {
        String postJson = "{\"settings\":{\"number_of_replicas\":0,\"number_of_shards\":5},\"mappings\":{\"default\":{\"properties\":{\"title\":{\"type\":\"text\",\"analyzer\":\"ik_max_word\",\"search_analyzer\":\"ik_max_word\"},\"content\":{\"type\":\"text\",\"analyzer\":\"ik_max_word\",\"search_analyzer\":\"ik_max_word\"},\"sourceName\":{\"type\":\"text\",\"analyzer\":\"ik_max_word\",\"search_analyzer\":\"ik_max_word\"},\"img\":{\"type\": \"text\",\"index\":false},\"category\":{\"type\": \"text\",\"index\":false},\"type\":{\"type\": \"text\",\"index\":false},\"createTime\":{\"type\": \"date\",\"format\":\"yyy-MM-dd HH:mm:ss\"}}}}}";

        RequestBody body = RequestBody.create(MediaType.get("application/json; charset=utf-8"), postJson);
        Request request = new Request.Builder()
                .url(url)
                .put(body)
                .build();

        try (Response response = HTTP_CLIENT.newCall(request).execute()) {
            response.close();
            if (response.code() == HttpStatus.SC_OK) {
                log.info("配置默认分词字段成功");
            }else{
                log.warn("配置默认分词字段失败或已配置");
            }
        } catch (Exception e) {
            log.error("配置默认分词字段失败", e);
            throw new RuntimeException("配置默认分词字段失败");
        }
    }

    protected void run() {
        log.info("准备数据传输...");

        tableList.forEach(table -> {
            LinkedList<Map<String, String>> list = fetchData(table);
            if (list.isEmpty()) {
                return;
            }
            insertToEs(table, list);
        });
    }

    /**
     * 在子类中定时调用,此为入口
     */
    protected abstract void main();
    /**
     * 在子类中定时调用,获取栏目列表
     */
    protected abstract void fetchCategoryList();
}

4.文章内容同步实现类

package cc.itmx.util.biz;

import cc.itmx.util.common.AbstractTransfer;
import cc.itmx.util.common.MyConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.HashMap;
import java.util.Map;

/**
 * @author ITMX
 */
@Slf4j
@Component
public class TransferNewsContent extends AbstractTransfer {

    @Autowired
    public TransferNewsContent(RedisTemplate<String, String> redisTemplate, MyConfig myConfig) {
        super(redisTemplate,myConfig,null);
    }

    @Override
    public Map<String, String> handelResultSet(ResultSetMetaData rsmd, int colCount, ResultSet rs, String tableName) {
        Map<String, String> obj = new HashMap<>(colCount + 3);
        try {
            for (int i = 1; i <= colCount; i++) {
                obj.put(rsmd.getColumnName(i), rs.getString(i));
            }
            obj.put("img", obj.get("images").split(";",2)[0]);
            obj.remove("images");
            obj.put("type","news");
            obj.put("category",tableName);
            obj.put("url", "");
        } catch (Exception e) {
            log.error("解析MySQL结果集异常:", e);
            return null;
        }
        return obj;
    }

    @Override
    protected String getSql(String tableName, String timeWhere) {
        return String.format("SELECT c.`id`,c.`title`,c.`sourceName`,c.`content`,c.`createTime`,l.`images`,l.`no` FROM `%s`.`%s` l INNER JOIN `%s`.`%s_content` c ON l.id = c.id AND l.del_flag = 0 %s ORDER BY l.`no` ASC LIMIT %d;", myConfig.getMysqlNewsDb(), tableName, myConfig.getMysqlNewsDb(), tableName, timeWhere, myConfig.getTransferLimit());
    }

    @Override
    @Scheduled(fixedDelay = 10_000)
    public void fetchCategoryList() {
        fetchCategoryList("/list/news");
    }

    @Override
    @Scheduled(fixedDelay = 4_000)
    public void main() {
        super.run();
    }

}

5.视频内容同步实现类

package cc.itmx.util.biz;

import cc.itmx.util.common.AbstractTransfer;
import cc.itmx.util.common.MyConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.HashMap;
import java.util.Map;

/**
 * @author ITMX
 */
@Slf4j
@Component
public class TransferVideoList extends AbstractTransfer {

    @Autowired
    public TransferVideoList(RedisTemplate<String, String> redisTemplate, MyConfig myConfig) {
        super(redisTemplate,myConfig, null);
    }

    @Override
    public Map<String, String> handelResultSet(ResultSetMetaData rsmd, int colCount, ResultSet rs, String tableName) {
        Map<String, String> obj = new HashMap<>(colCount + 3);
        try {
            for (int i = 1; i <= colCount; i++) {
                obj.put(rsmd.getColumnName(i), rs.getString(i));
            }

            obj.put("img", obj.get("thumbUrl"));
            obj.remove("thumbUrl");

            obj.put("sourceName", obj.get("source"));
            obj.remove("source");

            if("小视频".equals(tableName)) {
                obj.put("type", "wuli");
            }else{
                obj.put("type", "video");
            }

            obj.put("category",tableName);
        } catch (Exception e) {
            log.error("解析MySQL结果集异常:", e);
            return null;
        }
        return obj;
    }

    @Override
    protected String getSql(String tableName, String timeWhere) {
        return String.format("SELECT `id`, `title`, `thumbUrl`, `url`, `source`, `createTime`, `no` FROM `%s`.`%s` WHERE `del_flag` = 0 %s ORDER BY `no` ASC LIMIT %d;", myConfig.getMysqlVideoDb(),tableName, timeWhere, myConfig.getTransferLimit());
    }

    @Override
    @Scheduled(fixedDelay = 15_000)
    public void fetchCategoryList() {
        fetchCategoryList("/list/video");
    }

    @Override
    @Scheduled(fixedDelay = 6_000)
    public void main() {
        super.run();
    }
}

6.启动类ContentTransferToEsApplication

package cc.itmx.util;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
 * @author ITMX
 */
@EnableScheduling
@SpringBootApplication
public class ContentTransferToEsApplication {
    public static void main(String[] args) {
        SpringApplication.run(ContentTransferToEsApplication.class, args);
    }
}

3.使用http请求es接口搜索文章

一旦有数据存入我们的es中,那我们就可以通过http调用es的接口搜索内容了
post查询提交的JSON数据:

{
  query: {
    query_string: {
      query: "中国",
      fields: ["title"],
      minimum_should_match: "80%"
    }
  },
  from: 0,
  size: 10,
  sort: [{createTime: {unmapped_type: "date",order: "desc"}}],
  //获取第二页可加此项 search_after: ['上一页最后一条的时间戳']
}

curl -X POST -d '上面的JSON数据' http://192.168.1.251:9200/_search

4.使用LUA调用搜索接口

略,与项目耦合较高,缺少上下文不容易理解,暂不贴出

参考链接:
http://www.ruanyifeng.com/blog/2017/08/elasticsearch.html
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.5/index.html
https://www.yiibai.com/elasticsearch/elasticsearch_modules.html

评论