• 當前位置:首頁 > IT技術 > 數據庫 > 正文

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新
    2022-09-06 22:35:20


    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_自定義分詞庫


    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_java_02

    文章目錄

    一、源碼分析
    1. 默認熱更新

    官方提供的熱更新方式

    ??https://github.com/medcl/elasticsearch-analysis-ik??

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_mysql_03

    2. 熱更新分析

    上圖是官方提供的一種熱更新詞庫的方式,是基于遠程文件的,不太實用,但我們可以模仿這種方式自己實現一個基于 MySQL 的,官方提供的實現??org.wltea.analyzer.dic.Monitor??類中,以下是其完整代碼。

    • 1.向詞庫服務器發送Head請求
    • 2.從響應中獲取Last-Modify、ETags字段值,判斷是否變化
    • 3.如果未變化,休眠1min,返回第①步
    • 4.如果有變化,調用 Dictionary#reLoadMainDict()方法重新加載詞典
    • 5.休眠1min,返回第①步
    package org.wltea.analyzer.dic;

    import java.io.IOException;
    import java.security.AccessController;
    import java.security.PrivilegedAction;

    import org.apache.http.client.config.RequestConfig;
    import org.apache.http.client.methods.CloseableHttpResponse;
    import org.apache.http.client.methods.HttpHead;
    import org.apache.http.impl.client.CloseableHttpClient;
    import org.apache.http.impl.client.HttpClients;
    import org.apache.logging.log4j.Logger;
    import org.elasticsearch.SpecialPermission;
    import org.wltea.analyzer.help.ESPluginLoggerFactory;

    public class Monitor implements Runnable {

    private static final Logger logger = ESPluginLoggerFactory.getLogger(Monitor.class.getName());

    private static CloseableHttpClient httpclient = HttpClients.createDefault();
    /*
    * 上次更改時間
    */
    private String last_modified;
    /*
    * 資源屬性
    */
    private String eTags;

    /*
    * 請求地址
    */
    private String location;

    public Monitor(String location) {
    this.location = location;
    this.last_modified = null;
    this.eTags = null;
    }

    public void run() {
    SpecialPermission.check();
    AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
    this.runUnprivileged();
    return null;
    });
    }

    /**
    * 監控流程:
    * ①向詞庫服務器發送Head請求
    * ②從響應中獲取Last-Modify、ETags字段值,判斷是否變化
    * ③如果未變化,休眠1min,返回第①步
    * ④如果有變化,重新加載詞典
    * ⑤休眠1min,返回第①步
    */

    public void runUnprivileged() {

    //超時設置
    RequestConfig rc = RequestConfig.custom().setConnectionRequestTimeout(10*1000)
    .setConnectTimeout(10*1000).setSocketTimeout(15*1000).build();

    HttpHead head = new HttpHead(location);
    head.setConfig(rc);

    //設置請求頭
    if (last_modified != null) {
    head.setHeader("If-Modified-Since", last_modified);
    }
    if (eTags != null) {
    head.setHeader("If-None-Match", eTags);
    }

    CloseableHttpResponse response = null;
    try {

    response = httpclient.execute(head);

    //返回200 才做操作
    if(response.getStatusLine().getStatusCode()==200){

    if (((response.getLastHeader("Last-Modified")!=null) && !response.getLastHeader("Last-Modified").getValue().equalsIgnoreCase(last_modified))
    ||((response.getLastHeader("ETag")!=null) && !response.getLastHeader("ETag").getValue().equalsIgnoreCase(eTags))) {

    // 遠程詞庫有更新,需要重新加載詞典,并修改last_modified,eTags
    Dictionary.getSingleton().reLoadMainDict();
    last_modified = response.getLastHeader("Last-Modified")==null?null:response.getLastHeader("Last-Modified").getValue();
    eTags = response.getLastHeader("ETag")==null?null:response.getLastHeader("ETag").getValue();
    }
    }else if (response.getStatusLine().getStatusCode()==304) {
    //沒有修改,不做操作
    //noop
    }else{
    logger.info("remote_ext_dict {} return bad code {}" , location , response.getStatusLine().getStatusCode() );
    }

    } catch (Exception e) {
    logger.error("remote_ext_dict {} error!",e , location);
    }finally{
    try {
    if (response != null) {
    response.close();
    }
    } catch (IOException e) {
    logger.error(e.getMessage(), e);
    }
    }
    }

    }
    3. 方法分析

    ??eLoadMainDict()???會調用??loadMainDict()???,進而調用??loadRemoteExtDict()???加載了遠程自定義詞庫,同樣的調用??loadStopWordDict()??也會同時加載遠程停用詞庫。 reLoadMainDict()方法新創建了一個詞典實例來重新加載詞典,然后替換原來的詞典,是一個全量替換。

    void reLoadMainDict() {
    logger.info("重新加載詞典...");
    // 新開一個實例加載詞典,減少加載過程對當前詞典使用的影響
    Dictionary tmpDict = new Dictionary(configuration);
    tmpDict.configuration = getSingleton().configuration;
    tmpDict.loadMainDict();
    tmpDict.loadStopWordDict();
    _MainDict = tmpDict._MainDict;
    _StopWords = tmpDict._StopWords;
    logger.info("重新加載詞典完畢...");
    }

    /**
    * 加載主詞典及擴展詞典
    */
    private void () {
    // 建立一個主詞典實例
    _MainDict = new DictSegment((char) 0);

    // 讀取主詞典文件
    Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_MAIN);
    loadDictFile(_MainDict, file, false, "Main Dict");
    // 加載擴展詞典
    this.loadExtDict();
    // 加載遠程自定義詞庫
    this.loadRemoteExtDict();
    }

    ??loadRemoteExtDict()??方法的邏輯也很清晰:

    • 1.獲取遠程詞典的 URL,可能有多個
    • 2.循環請求每個 URL,取回遠程詞典
    • 3.將遠程詞典添加到主詞典中??_MainDict.fillSegment(theWord.trim().toLowerCase().toCharArray());??? 這里需要重點關注的是 ??fillSegment()??方法,它的作用是將一個詞加入詞典,與之相反的方法是??disableSegment()??,屏蔽詞典中的一個詞。
    loadRemoteExtDict() {
    List<String> remoteExtDictFiles = getRemoteExtDictionarys();
    for (String location : remoteExtDictFiles) {
    logger.info("[Dict Loading] " + location);
    List<String> lists = getRemoteWords(location);
    // 如果找不到擴展的字典,則忽略
    if (lists == null) {
    logger.error("[Dict Loading] " + location + " load failed");
    continue;
    }
    for (String theWord : lists) {
    if (theWord != null && !"".equals(theWord.trim())) {
    // 加載擴展詞典數據到主內存詞典中
    logger.info(theWord);
    _MainDict.fillSegment(theWord.trim().toLowerCase().toCharArray());
    }
    }
    }

    }

    /**
    * 加載填充詞典片段
    * @param charArray
    */
    void fillSegment(char[] charArray){
    this.fillSegment(charArray, 0 , charArray.length , 1);
    }

    /**
    * 屏蔽詞典中的一個詞
    * @param charArray
    */
    void disableSegment(char[] charArray){
    this.fillSegment(charArray, 0 , charArray.length , 0);
    }

    ??Monitor???類只是一個監控程序,它是在??org.wltea.analyzer.dic.Dictionary類的initial()??方法被啟動的,以下代碼的 29~35 行。

    ...
    ...
    // 線程池
    private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    ...
    ...

    /**
    * 詞典初始化 由于IK Analyzer的詞典采用Dictionary類的靜態方法進行詞典初始化
    * 只有當Dictionary類被實際調用時,才會開始載入詞典, 這將延長首次分詞操作的時間 該方法提供了一個在應用加載階段就初始化字典的手段
    *
    * @return Dictionary
    */
    public static synchronized void initial(Configuration cfg) {
    if (singleton == null) {
    synchronized (Dictionary.class) {
    if (singleton == null) {

    singleton = new Dictionary(cfg);
    singleton.loadMainDict();
    singleton.loadSurnameDict();
    singleton.loadQuantifierDict();
    singleton.loadSuffixDict();
    singleton.loadPrepDict();
    singleton.loadStopWordDict();

    if(cfg.isEnableRemoteDict()){
    // 建立監控線程
    for (String location : singleton.getRemoteExtDictionarys()) {
    // 10 秒是初始延遲可以修改的 60是間隔時間 單位秒
    pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
    }
    for (String location : singleton.getRemoteExtStopWordDictionarys()) {
    pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
    }
    }

    }
    }
    }
    }
    二、詞庫熱更新

    實現基于MySql的詞庫熱更新

    2.1. 導入依賴

    在項目根目錄的pom文件中修改es的版本,以及引入mysql8.0依賴

    <properties>
    <elasticsearch.version>7.15.2</elasticsearch.version>
    </properties>

    <!--mysql驅動-->
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
    </dependency>

    默認是??7.14.0-SNAPSHOT??

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_java_04


    調整版本為??7.15.2??

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_自定義分詞庫_05


    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_加載_06

    2.2. 數據庫

    創建數據庫??dianpingdb???,初始化表結構
    ???es_extra_main、es_extra_stopword??分別為主詞典和停用詞典。

    CREATE TABLE `es_extra_main` (
    `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
    `word` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '詞',
    `is_deleted` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否已刪除',
    `update_time` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '更新時間',
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


    CREATE TABLE `es_extra_stopword` (
    `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
    `word` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '詞',
    `is_deleted` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否已刪除',
    `update_time` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '更新時間',
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    2.3. JDBC 配置

    在項目的config文件夾下創建??jdbc.properties?? 文件,記錄 MySQL 的 url、driver、username、password,和查詢主詞典、停用詞典的 SQL,以及熱更新的間隔秒數。從兩個 SQL 可以看出我的設計是增量更新,而不是官方的全量替換。

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_java_07


    jdbc.properties內容

    jdbc.url=jdbc:mysql://192.168.92.128:3306/dianpingdb?useAffectedRows=true&characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true&serverTimezone=GMT%2B8&allowMultiQueries=true
    jdbc.username=root
    jdbc.password=123456
    jdbc.driver=com.mysql.cj.jdbc.Driver
    jdbc.update.main.dic.sql=SELECT * FROM `es_extra_main` WHERE update_time > ? order by update_time asc
    jdbc.update.stopword.sql=SELECT * FROM `es_extra_stopword` WHERE update_time > ? order by update_time asc
    jdbc.update.interval=10
    2.4. 打包配置

    ??src/main/assemblies/plugin.xml?? 將 MySQL 驅動的依賴寫入,否則打成 zip 后會沒有 MySQL 驅動的 jar 包。

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_加載_08

    <!--這里 看我看我-->
    <include>mysql:mysql-connector-java</include>
    2.5. 權限策略

    ??src/main/resources/plugin-security.policy??? 添加??permission java.lang.RuntimePermission "setContextClassLoader";??,否則會因為權限問題拋出以下異常。

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_mysql_09

    grant {
    // needed because of the hot reload functionality
    permission java.net.SocketPermission "*", "connect,resolve";
    permission java.lang.RuntimePermission "setContextClassLoader";
    };

    不添加以上配置,拋出的異常信息:

    java.lang.ExceptionInInitializerError: null
    at java.lang.Class.forName0(Native Method) ~[?:1.8.0_261]
    at java.lang.Class.forName(Unknown Source) ~[?:1.8.0_261]
    at com.mysql.cj.jdbc.NonRegisteringDriver.<clinit>(NonRegisteringDriver.java:97) ~[?:?]
    at java.lang.Class.forName0(Native Method) ~[?:1.8.0_261]
    at java.lang.Class.forName(Unknown Source) ~[?:1.8.0_261]
    at org.wltea.analyzer.dic.DatabaseMonitor.lambda$new$0(DatabaseMonitor.java:72) ~[?:?]
    at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_261]
    at org.wltea.analyzer.dic.DatabaseMonitor.<init>(DatabaseMonitor.java:70) ~[?:?]
    at org.wltea.analyzer.dic.Dictionary.initial(Dictionary.java:172) ~[?:?]
    at org.wltea.analyzer.cfg.Configuration.<init>(Configuration.java:40) ~[?:?]
    at org.elasticsearch.index.analysis.IkTokenizerFactory.<init>(IkTokenizerFactory.java:15) ~[?:?]
    at org.elasticsearch.index.analysis.IkTokenizerFactory.getIkSmartTokenizerFactory(IkTokenizerFactory.java:23) ~[?:?]
    at org.elasticsearch.index.analysis.AnalysisRegistry.buildMapping(AnalysisRegistry.java:379) ~[elasticsearch-6.7.2.jar:6.7.2]
    at org.elasticsearch.index.analysis.AnalysisRegistry.buildTokenizerFactories(AnalysisRegistry.java:189) ~[elasticsearch-6.7.2.jar:6.7.2]
    at org.elasticsearch.index.analysis.AnalysisRegistry.build(AnalysisRegistry.java:163) ~[elasticsearch-6.7.2.jar:6.7.2]
    at org.elasticsearch.index.IndexService.<init>(IndexService.java:164) ~[elasticsearch-6.7.2.jar:6.7.2]
    at org.elasticsearch.index.IndexModule.newIndexService(IndexModule.java:402) ~[elasticsearch-6.7.2.jar:6.7.2]
    at org.elasticsearch.indices.IndicesService.createIndexService(IndicesService.java:526) ~[elasticsearch-6.7.2.jar:6.7.2]
    at org.elasticsearch.indices.IndicesService.verifyIndexMetadata(IndicesService.java:599) ~[elasticsearch-6.7.2.jar:6.7.2]
    at org.elasticsearch.gateway.Gateway.performStateRecovery(Gateway.java:129) ~[elasticsearch-6.7.2.jar:6.7.2]
    at org.elasticsearch.gateway.GatewayService$1.doRun(GatewayService.java:227) ~[elasticsearch-6.7.2.jar:6.7.2]
    at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:751) ~[elasticsearch-6.7.2.jar:6.7.2]
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) ~[elasticsearch-6.7.2.jar:6.7.2]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:1.8.0_261]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:1.8.0_261]
    at java.lang.Thread.run(Unknown Source) [?:1.8.0_261]
    Caused by: java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "setContextClassLoader")
    at java.security.AccessControlContext.checkPermission(Unknown Source) ~[?:1.8.0_261]
    at java.security.AccessController.checkPermission(Unknown Source) ~[?:1.8.0_261]
    at java.lang.SecurityManager.checkPermission(Unknown Source) ~[?:1.8.0_261]
    at java.lang.Thread.setContextClassLoader(Unknown Source) ~[?:1.8.0_261]
    at com.mysql.cj.jdbc.AbandonedConnectionCleanupThread.lambda$static$0(AbandonedConnectionCleanupThread.java:72) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(Unknown Source) ~[?:1.8.0_261]
    at java.util.concurrent.ThreadPoolExecutor.addWorker(Unknown Source) ~[?:1.8.0_261]
    at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source) ~[?:1.8.0_261]
    at java.util.concurrent.Executors$DelegatedExecutorService.execute(Unknown Source) ~[?:1.8.0_261]
    at com.mysql.cj.jdbc.AbandonedConnectionCleanupThread.<clinit>(AbandonedConnectionCleanupThread.java:75) ~[?:?]
    ... 26 more
    2.6. 修改 Dictionary
    • 1.在構造方法中加載 jdbc.properties 文件
    • Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_java_10

    ();
    • 2.將 getProperty()改為 public
    • Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_java_11

    • 3.添加了幾個方法,用于增刪詞條
      在類的最后添加以下幾個方法
    • Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_加載_12


    • Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_mysql_13

    (String word) {
    singleton._MainDict.fillSegment(word.trim().toLowerCase().toCharArray());
    }

    /**
    * 移除(屏蔽)詞條
    */
    public static void disableWord(String word) {
    singleton._MainDict.disableSegment(word.trim().toLowerCase().toCharArray());
    }

    /**
    * 加載新停用詞
    */
    public static void addStopword(String word) {
    singleton._StopWords.fillSegment(word.trim().toLowerCase().toCharArray());
    }

    /**
    * 移除(屏蔽)停用詞
    */
    public static void disableStopword(String word) {
    singleton._StopWords.disableSegment(word.trim().toLowerCase().toCharArray());
    }

    /**
    * 加載 jdbc.properties
    */
    public void loadJdbcProperties() {
    Path file = PathUtils.get(getDictRoot(), DatabaseMonitor.PATH_JDBC_PROPERTIES);
    try {
    props.load(new FileInputStream(file.toFile()));
    logger.info("====================================properties====================================");
    for (Map.Entry<Object, Object> entry : props.entrySet()) {
    logger.info("{}: {}", entry.getKey(), entry.getValue());
    }
    logger.info("====================================properties====================================");
    } catch (IOException e) {
    logger.error("failed to read file: " + DatabaseMonitor.PATH_JDBC_PROPERTIES, e);
    }
    }
    • 4.initial()啟動自己實現的數據庫監控線程
      搜索???initial(Configuration cfg)??方法
    • Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_加載_14


    • Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_自定義分詞庫_15

    // 建立數據庫監控線程
    pool.scheduleAtFixedRate(new DatabaseMonitor(), 10, Long.parseLong(getSingleton().getProperty(DatabaseMonitor.JDBC_UPDATE_INTERVAL)), TimeUnit.SECONDS);
    2.7. 熱更新類

    MySQL 熱更新的實現類 DatabaseMonitor

    • 1.??lastUpdateTimeOfMainDic、lastUpdateTimeOfStopword??? 記錄上次處理的最后一條的??updateTime??
    • 2.查出上次處理之后新增或刪除的記錄
    • 3.循環判斷??is_deleted??? 字段,為??true???則添加詞條,??false??則刪除詞條

    在??org.wltea.analyzer.dic???包下創建??DatabaseMonitor??類

    package org.wltea.analyzer.dic;

    import org.apache.logging.log4j.Logger;
    import org.elasticsearch.SpecialPermission;
    import org.wltea.analyzer.help.ESPluginLoggerFactory;

    import java.security.AccessController;
    import java.security.PrivilegedAction;
    import java.sql.*;
    import java.time.LocalDate;
    import java.time.LocalDateTime;
    import java.time.LocalTime;

    /**
    * 通過 mysql 更新詞典
    *
    * @author gblfy
    * @date 2021-11-21
    * @WebSite gblfy.com
    */
    public class DatabaseMonitor implements Runnable {

    private static final Logger logger = ESPluginLoggerFactory.getLogger(DatabaseMonitor.class.getName());
    public static final String PATH_JDBC_PROPERTIES = "jdbc.properties";

    private static final String JDBC_URL = "jdbc.url";
    private static final String JDBC_USERNAME = "jdbc.username";
    private static final String JDBC_PASSWORD = "jdbc.password";
    private static final String JDBC_DRIVER = "jdbc.driver";
    private static final String SQL_UPDATE_MAIN_DIC = "jdbc.update.main.dic.sql";
    private static final String SQL_UPDATE_STOPWORD = "jdbc.update.stopword.sql";

    /**
    * 更新間隔
    */
    public final static String JDBC_UPDATE_INTERVAL = "jdbc.update.interval";

    private static final Timestamp DEFAULT_LAST_UPDATE = Timestamp.valueOf(LocalDateTime.of(LocalDate.of(2020, 1, 1), LocalTime.MIN));

    private static Timestamp lastUpdateTimeOfMainDic = null;

    private static Timestamp lastUpdateTimeOfStopword = null;

    public String getUrl() {
    return Dictionary.getSingleton().getProperty(JDBC_URL);
    }

    public String getUsername() {
    return Dictionary.getSingleton().getProperty(JDBC_USERNAME);
    }

    public String getPassword() {
    return Dictionary.getSingleton().getProperty(JDBC_PASSWORD);
    }

    public String getDriver() {
    return Dictionary.getSingleton().getProperty(JDBC_DRIVER);
    }

    public String getUpdateMainDicSql() {
    return Dictionary.getSingleton().getProperty(SQL_UPDATE_MAIN_DIC);
    }

    public String getUpdateStopwordSql() {
    return Dictionary.getSingleton().getProperty(SQL_UPDATE_STOPWORD);
    }

    /**
    * 加載MySQL驅動
    */
    public DatabaseMonitor() {
    SpecialPermission.check();
    AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
    try {
    Class.forName(getDriver());
    } catch (ClassNotFoundException e) {
    logger.error("mysql jdbc driver not found", e);
    }
    return null;
    });


    }

    @Override
    public void run() {
    SpecialPermission.check();
    AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
    Connection conn = getConnection();

    // 更新主詞典
    updateMainDic(conn);
    // 更新停用詞
    updateStopword(conn);
    closeConnection(conn);

    return null;
    });

    }

    public Connection getConnection() {
    Connection connection = null;
    try {
    connection = DriverManager.getConnection(getUrl(), getUsername(), getPassword());
    } catch (SQLException e) {
    logger.error("failed to get connection", e);
    }
    return connection;
    }

    public void closeConnection(Connection conn) {
    if (conn != null) {
    try {
    conn.close();
    } catch (SQLException e) {
    logger.error("failed to close Connection", e);
    }
    }
    }

    public void closeRsAndPs(ResultSet rs, PreparedStatement ps) {
    if (rs != null) {
    try {
    rs.close();
    } catch (SQLException e) {
    logger.error("failed to close ResultSet", e);
    }
    }

    if (ps != null) {
    try {
    ps.close();
    } catch (SQLException e) {
    logger.error("failed to close PreparedStatement", e);
    }
    }

    }

    /**
    * 主詞典
    */
    public synchronized void updateMainDic(Connection conn) {

    logger.info("start update main dic");
    int numberOfAddWords = 0;
    int numberOfDisableWords = 0;
    PreparedStatement ps = null;
    ResultSet rs = null;

    try {
    String sql = getUpdateMainDicSql();

    Timestamp param = lastUpdateTimeOfMainDic == null ? DEFAULT_LAST_UPDATE : lastUpdateTimeOfMainDic;

    logger.info("param: " + param);

    ps = conn.prepareStatement(sql);
    ps.setTimestamp(1, param);

    rs = ps.executeQuery();

    while (rs.next()) {
    String word = rs.getString("word");
    word = word.trim();

    if (word.isEmpty()) {
    continue;
    }

    lastUpdateTimeOfMainDic = rs.getTimestamp("update_time");

    if (rs.getBoolean("is_deleted")) {
    logger.info("[main dic] disable word: {}", word);
    // 刪除
    Dictionary.disableWord(word);
    numberOfDisableWords++;
    } else {
    logger.info("[main dic] add word: {}", word);
    // 添加
    Dictionary.addWord(word);
    numberOfAddWords++;
    }
    }

    logger.info("end update main dic -> addWord: {}, disableWord: {}", numberOfAddWords, numberOfDisableWords);

    } catch (SQLException e) {
    logger.error("failed to update main_dic", e);
    // 關閉 ResultSet、PreparedStatement
    closeRsAndPs(rs, ps);
    }
    }

    /**
    * 停用詞
    */
    public synchronized void updateStopword(Connection conn) {

    logger.info("start update stopword");

    int numberOfAddWords = 0;
    int numberOfDisableWords = 0;
    PreparedStatement ps = null;
    ResultSet rs = null;
    try {
    String sql = getUpdateStopwordSql();

    Timestamp param = lastUpdateTimeOfStopword == null ? DEFAULT_LAST_UPDATE : lastUpdateTimeOfStopword;

    logger.info("param: " + param);

    ps = conn.prepareStatement(sql);
    ps.setTimestamp(1, param);

    rs = ps.executeQuery();

    while (rs.next()) {
    String word = rs.getString("word");
    word = word.trim();


    if (word.isEmpty()) {
    continue;
    }

    lastUpdateTimeOfStopword = rs.getTimestamp("update_time");

    if (rs.getBoolean("is_deleted")) {
    logger.info("[stopword] disable word: {}", word);

    // 刪除
    Dictionary.disableStopword(word);
    numberOfDisableWords++;
    } else {
    logger.info("[stopword] add word: {}", word);
    // 添加
    Dictionary.addStopword(word);
    numberOfAddWords++;
    }
    }

    logger.info("end update stopword -> addWord: {}, disableWord: {}", numberOfAddWords, numberOfDisableWords);

    } catch (SQLException e) {
    logger.error("failed to update main_dic", e);
    } finally {
    // 關閉 ResultSet、PreparedStatement
    closeRsAndPs(rs, ps);
    }
    }
    }
    2.8. 編譯打包

    直接??mvn clean package???,然后在 ??elasticsearch-analysis-ik/target/releases???目錄中找到 ??elasticsearch-analysis-ik-7.15.2.zip??? 壓縮包,上傳到??plugins??目錄下面(我的目錄是/app/elasticsearch-7.15.2/plugins)

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_java_16

    2.9. 上傳

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_mysql_17

    2.10. 修改記錄

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_自定義分詞庫_18

    三、服務器操作
    3.1. 分詞插件目錄

    新建analysis-ik文件夾

    cd /app/elasticsearch-7.15.2/plugins/
    mkdir
    3.2. 解壓es
    unzip
    3.3. 移動文件

    將解壓后的文件都移動到 analysis-ik文件夾下面

    mv
    3.4. 目錄結構

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_java_19

    3.5. 配置轉移

    將jdbc復制到指定目錄

    啟動時會加載??/app/elasticsearch-7.15.2/config/analysis-ik/jdbc.properties??

    cd /app/elasticsearch-7.15.2/plugins/
    cp
    3.6. 重新啟動es
    cd /app/elasticsearch-7.15.2/
    bin/elasticsearch -d && tail
    3.7. 測試分詞

    沒有添加任何自定義分詞的情況下,提前測試看效果

    # 查閱凱悅分詞
    GET /shop/_analyze
    {
    "analyzer": "ik_smart",
    "text": "我叫凱悅"
    }

    GET /shop/_analyze
    {
    "analyzer": "ik_max_word",
    "text": "我叫凱悅"
    }

    搜索結果:把??我叫凱悅??分詞成了單字組合形式

    {
    "tokens" : [
    {
    "token" : "我",
    "start_offset" : 0,
    "end_offset" : 1,
    "type" : "CN_CHAR",
    "position" : 0
    },
    {
    "token" : "叫",
    "start_offset" : 1,
    "end_offset" : 2,
    "type" : "CN_CHAR",
    "position" : 1
    },
    {
    "token" : "凱",
    "start_offset" : 2,
    "end_offset" : 3,
    "type" : "CN_CHAR",
    "position" : 2
    },
    {
    "token" : "悅",
    "start_offset" : 3,
    "end_offset" : 4,
    "type" : "CN_CHAR",
    "position" : 3
    }
    ]
    }

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_加載_20

    3.8. 新增分詞

    在是數據庫中的??es_extra_main???表中添加自定義分析??“我叫凱瑞”?? ,提交事務

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_加載_21

    3.9. es控制臺監控

    從下面截圖中更可以看出,已經加載到咱么剛才添加的自定義??“我叫凱瑞”??分詞了

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_加載_22

    3.10. 重新查看分詞
    # 查閱凱悅分詞
    GET /shop/_analyze
    {
    "analyzer": "ik_smart",
    "text": "我叫凱悅"
    }

    GET /shop/_analyze
    {
    "analyzer": "ik_max_word",
    "text": "我叫凱悅"
    }
    3.11. 分詞數據

    從截圖中可以看出,把 ??“我叫凱瑞”??作為一個整體的分詞了

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_自定義分詞庫_23

    3.12. 修改后的源碼

    ??https://gitee.com/gb_90/elasticsearch-analysis-ik??

    Elasticsearch7.15.2 修改IK分詞器源碼實現基于MySql8的詞庫熱更新_自定義分詞庫_24


    本文摘自 :https://blog.51cto.com/g

    開通會員,享受整站包年服務
    国产呦精品一区二区三区网站|久久www免费人咸|精品无码人妻一区二区|久99久热只有精品国产15|中文字幕亚洲无线码