成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

[Spring cloud 一步步實現(xiàn)廣告系統(tǒng)] 15. 使用開源組件監(jiān)聽Binlog 實現(xiàn)增量索引

darryrzhong / 2497人閱讀

摘要:不會記錄數(shù)據(jù)表的列名在接下來的實現(xiàn)中,我們會將自己的系統(tǒng)包裝成一個假的,通過開源工具來實現(xiàn)監(jiān)聽。因為我們只需要中的內(nèi)容,那么我們也就只需要通過實現(xiàn)接口,來自定義一個監(jiān)聽器實現(xiàn)我們的業(yè)務(wù)即可。

MySQL Binlog簡介

什么是binlog?

一個二進制日志,用來記錄對數(shù)據(jù)發(fā)生或潛在發(fā)生更改的SQL語句,并以而進行的形式保存在磁盤中。

binlog 的作用?

最主要有3個用途:

數(shù)據(jù)復制(主從同步)

Mysql 的Master-Slave協(xié)議,讓Slave可以通過監(jiān)聽binlog實現(xiàn)數(shù)據(jù)復制,達到數(shù)據(jù)一致性目的

數(shù)據(jù)恢復

通過mysqlbinlog工具恢復數(shù)據(jù)

增量備份

Binlog 變量

log_bin (Binlog 開關(guān),使用show variables like "log_bin";查看)

binlog_format (Binlog 日志格式,使用show variables like "binlog_format";查看)

日志格式總共有三種:

ROW, 僅保存記錄被修改的細節(jié),不記錄SQL語句上下文相關(guān)信息。(能清晰的記錄下每行數(shù)據(jù)的修改細節(jié),不需要記錄上下文相關(guān)信息,因此不會發(fā)生某些特定情況下的procedure、function以及trigger 的調(diào)用無法被準確復制的問題,任何情況下都可以被復制,且能加快從庫重放日志的效率,保證從庫數(shù)據(jù)的一致性)

STATEMENT,每一條修改數(shù)據(jù)的SQL都會被記錄。(只記錄執(zhí)行語句的細節(jié)和上下文環(huán)境,避免了記錄每一行的變化,在一些修改記錄較多的情況下,相比ROW類型能大大減少binlog的日志量,節(jié)約IO,提高性能。還可以用于實時的還原,同時主從版本可以不一樣,從服務(wù)器版本可以比主服務(wù)器版本高)

MIXED, 上述2種的混合使用

Binlog 管理

show master logs; 查看所有binlog的日志列表

show master status; 查看最后一個binlog日志編號名稱,以及最后一個事件技術(shù)的位置(position)

Flush logs; 刷新binlog,此刻開始產(chǎn)生一個新編號的binlog日志文件

reset master; 清空所有的binlog日志

Binlog 相關(guān)SQL show binlog events[in "log_name"][from position][limit [offset,]row_count]

常用的Binlog event

QUERY - 與數(shù)據(jù)無關(guān)的操作,begin、drop table、truncate table等等

TABLE_MAP - 記錄下一個操作所對應(yīng)的表信息,存儲了數(shù)據(jù)庫名稱和表名稱

XID - 標記事務(wù)提交

WRITE_ROWS 插入數(shù)據(jù),即insert操作

UPDATE_ROWS 更新數(shù)據(jù),即update操作

DELETE_ROWS 刪除數(shù)據(jù),即delete操作

Event包含header和data兩部分,header提供了event的創(chuàng)建時間,哪個服務(wù)器等信息,data部分提供的是針對該event的具體信息,如具體數(shù)據(jù)的修改。

Tip: binlog不會記錄數(shù)據(jù)表的列名

在接下來的實現(xiàn)中,我們會將自己的系統(tǒng)包裝成一個假的Mysql Slave,通過開源工具mysql-binlog-connector-java來實現(xiàn)監(jiān)聽binlog。

開源工具mysql-binlog-connector-java

工具源碼:Github傳送門

組件使用

1.加依賴



    com.github.shyiko
    mysql-binlog-connector-java
    0.18.1

2.創(chuàng)建一個測試接口

package com.sxzhongf.ad.service;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;

import java.io.IOException;

public class BinlogServiceTest {

    public static void main(String[] args) throws IOException {

      //構(gòu)造BinaryLogClient,填充mysql鏈接信息
        BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306,
                "root", "12345678"
        );

        //設(shè)置需要讀取的Binlog的文件以及位置,否則,client會從"頭"開始讀取Binlog并監(jiān)聽
        //client.setBinlogFilename("binlog.000035");
        //client.setBinlogPosition();

        //給客戶端注冊監(jiān)聽器,實現(xiàn)對Binlog的監(jiān)聽和解析
        //event 就是監(jiān)聽到的Binlog變化信息,event包含header & data 兩部分
        client.registerEventListener(event -> {
            EventData data = event.getData();
            if (data instanceof UpdateRowsEventData) {
                System.out.println("--------Update-----------");
                System.out.println(data.toString());
            } else if (data instanceof WriteRowsEventData) {
                System.out.println("--------Insert-----------");
                System.out.println(data.toString());
            } else if (data instanceof DeleteRowsEventData) {
                System.out.println("--------Delete-----------");
                System.out.println(data.toString());
            }
        });
        client.connect();
    }
  }

運行:

八月 08, 2019 9:13:32 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to 127.0.0.1:3306 at binlog.000038/951 (sid:65535, cid:336)
...

執(zhí)行sql update ad_user set user_status=1 where user_id=10;

我們需要知道的是,我們的目的是實現(xiàn)對Mysql數(shù)據(jù)表的變更實現(xiàn)監(jiān)聽,并解析成我們想要的格式,也就是我們的java對象。根據(jù)上面我們看到的監(jiān)聽結(jié)果,我們知道了返回信息的大概內(nèi)容,既然我們已經(jīng)學會了簡單的使用BinaryLogClient 來監(jiān)聽binlog,接下來,我們需要定義一個監(jiān)聽器,來實現(xiàn)我們自己的業(yè)務(wù)內(nèi)容。

因為我們只需要Event中的內(nèi)容,那么我們也就只需要通過實現(xiàn)com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener接口,來自定義一個監(jiān)聽器實現(xiàn)我們的業(yè)務(wù)即可。通過Event的內(nèi)容,來判定是否需要處理當前event以及如何處理。

構(gòu)造解析binlog的模版文件

我們監(jiān)聽binlog來構(gòu)造增量數(shù)據(jù)的根本原因,是為了將我們的廣告投放系統(tǒng)廣告檢索系統(tǒng) 業(yè)務(wù)解耦,由于我們的檢索系統(tǒng)中沒有定義數(shù)據(jù)庫以及數(shù)據(jù)表的相關(guān),所以,我們通過定義一份模版文件,通過解析模版文件來得到我們需要的數(shù)據(jù)庫和表信息,因為binlog的監(jiān)聽是不區(qū)分是哪個數(shù)據(jù)庫和哪個數(shù)據(jù)表信息的,我們可以通過模版來指定我們想要監(jiān)聽的部分。

{
  "database": "advertisement",
  "tableList": [
    {
      "tableName": "ad_plan",
      "level": 2,
      "insert": [
        {
          "column": "plan_id"
        },
        {
          "column": "user_id"
        },
        {
          "column": "plan_status"
        },
        {
          "column": "start_date"
        },
        {
          "column": "end_date"
        }
      ],
      "update": [
        {
          "column": "plan_id"
        },
        {
          "column": "user_id"
        },
        {
          "column": "plan_status"
        },
        {
          "column": "start_date"
        },
        {
          "column": "end_date"
        }
      ],
      "delete": [
        {
          "column": "plan_id"
        }
      ]
    },
    {
      "tableName": "ad_unit",
      "level": 3,
      "insert": [
        {
          "column": "unit_id"
        },
        {
          "column": "unit_status"
        },
        {
          "column": "position_type"
        },
        {
          "column": "plan_id"
        }
      ],
      "update": [
        {
          "column": "unit_id"
        },
        {
          "column": "unit_status"
        },
        {
          "column": "position_type"
        },
        {
          "column": "plan_id"
        }
      ],
      "delete": [
        {
          "column": "unit_id"
        }
      ]
    },
    {
      "tableName": "ad_creative",
      "level": 2,
      "insert": [
        {
          "column": "creative_id"
        },
        {
          "column": "type"
        },
        {
          "column": "material_type"
        },
        {
          "column": "height"
        },
        {
          "column": "width"
        },
        {
          "column": "audit_status"
        },
        {
          "column": "url"
        }
      ],
      "update": [
        {
          "column": "creative_id"
        },
        {
          "column": "type"
        },
        {
          "column": "material_type"
        },
        {
          "column": "height"
        },
        {
          "column": "width"
        },
        {
          "column": "audit_status"
        },
        {
          "column": "url"
        }
      ],
      "delete": [
        {
          "column": "creative_id"
        }
      ]
    },
    {
      "tableName": "relationship_creative_unit",
      "level": 3,
      "insert": [
        {
          "column": "creative_id"
        },
        {
          "column": "unit_id"
        }
      ],
      "update": [
      ],
      "delete": [
        {
          "column": "creative_id"
        },
        {
          "column": "unit_id"
        }
      ]
    },
    {
      "tableName": "ad_unit_district",
      "level": 4,
      "insert": [
        {
          "column": "unit_id"
        },
        {
          "column": "province"
        },
        {
          "column": "city"
        }
      ],
      "update": [
      ],
      "delete": [
        {
          "column": "unit_id"
        },
        {
          "column": "province"
        },
        {
          "column": "city"
        }
      ]
    },
    {
      "tableName": "ad_unit_hobby",
      "level": 4,
      "insert": [
        {
          "column": "unit_id"
        },
        {
          "column": "hobby_tag"
        }
      ],
      "update": [
      ],
      "delete": [
        {
          "column": "unit_id"
        },
        {
          "column": "hobby_tag"
        }
      ]
    },
    {
      "tableName": "ad_unit_keyword",
      "level": 4,
      "insert": [
        {
          "column": "unit_id"
        },
        {
          "column": "keyword"
        }
      ],
      "update": [
      ],
      "delete": [
        {
          "column": "unit_id"
        },
        {
          "column": "keyword"
        }
      ]
    }
  ]
}

上面的模版文件中,指定了一個數(shù)據(jù)庫為advertisement,大家可以方便添加多個監(jiān)聽庫。在數(shù)據(jù)庫下面,我們監(jiān)聽了幾個表的CUD操作以及每個操作所需要的字段信息。

實現(xiàn)模版 —> Java Entity

定義模版文件對應(yīng)的實體

@Data
@AllArgsConstructor
@NoArgsConstructor
public class BinlogTemplate {
        //單數(shù)據(jù)庫對應(yīng)
    private String database;
      //多表
    private List tableList;
}

對應(yīng)的json 中 table信息

/**
 * JsonTable for 用于表示template.json中對應(yīng)的表信息
 *
 * @author Isaac.Zhang | 若初
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class JsonTable {
    private String tableName;
    private Integer level;

    private List insert;
    private List update;
    private List delete;

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Column {
        private String columnName;
    }
}

讀取的對應(yīng)表信息對象(最主要目的就是為了能將字段索引 映射到 字段名稱

@Data
@AllArgsConstructor
@NoArgsConstructor
public class TableTemplate {
    private String tableName;
    private String level;

      //操作類型 -> 多列
    private Map> opTypeFieldSetMap = new HashMap<>();

    /**
     * Binlog日志中 字段索引 -> 字段名稱 的一個轉(zhuǎn)換映射
     * 因為binlog中不會顯示更新的列名是什么,它只會展示字段的索引,因此我們需要實現(xiàn)一次轉(zhuǎn)換
     */
    private Map posMap = new HashMap<>();
}

解析模版文件到j(luò)ava對象

@Data
public class ParseCustomTemplate {

    private String database;

    /**
     * key -> TableName
     * value -> {@link TableTemplate}
     */
    private Map tableTemplateMap;

    public static ParseCustomTemplate parse(BinlogTemplate _template) {
        ParseCustomTemplate template = new ParseCustomTemplate();
        template.setDatabase(_template.getDatabase());

        for (JsonTable jsonTable : _template.getTableList()) {
            String name = jsonTable.getTableName();
            Integer level = jsonTable.getLevel();

            TableTemplate tableTemplate = new TableTemplate();
            tableTemplate.setTableName(name);
            tableTemplate.setLevel(level.toString());
            template.tableTemplateMap.put(name, tableTemplate);

            //遍歷操作類型對應(yīng)的列信息
            Map> operationTypeListMap = tableTemplate.getOpTypeFieldSetMap();

            for (JsonTable.Column column : jsonTable.getInsert()) {
                getAndCreateIfNeed(
                        OperationTypeEnum.ADD,
                        operationTypeListMap,
                        ArrayList::new
                ).add(column.getColumnName());
            }

            for (JsonTable.Column column : jsonTable.getUpdate()) {
                getAndCreateIfNeed(
                        OperationTypeEnum.UPDATE,
                        operationTypeListMap,
                        ArrayList::new
                ).add(column.getColumnName());
            }

            for (JsonTable.Column column : jsonTable.getDelete()) {
                getAndCreateIfNeed(
                        OperationTypeEnum.DELETE,
                        operationTypeListMap,
                        ArrayList::new
                ).add(column.getColumnName());
            }
        }

        return template;
    }

    /**
     * 從Map中獲取對象,如果不存在,創(chuàng)建一個
     */
    private static  R getAndCreateIfNeed(T key, Map map, Supplier factory) {
        return map.computeIfAbsent(key, k -> factory.get());
    }
}

解析 字段索引 -> 字段名稱 的一個轉(zhuǎn)換映射

首先,我們來看一下binlog的具體日志信息:

--------Insert-----------
WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
[10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
--------Update-----------
UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[
    {before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}

可以看到,在日志中includedColumns只包含了{0, 1, 2, 3, 4, 5}位置信息,那么我們怎么能知道它具體代表的是哪個字段呢,接下來我們來實現(xiàn)這步映射關(guān)系,在實現(xiàn)之前,我們先來查詢一下數(shù)據(jù)庫中我們的表中字段所處的具體位置:

sql> SELECT table_schema,table_name,column_name,ordinal_position FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = "advertisement" AND TABLE_NAME="ad_user"

我們可以看到ordinal_position對應(yīng)的是1-6,可是上面監(jiān)聽到的binlog日志索引是0-5,所以我們就可以看出來之間的對應(yīng)關(guān)系。

我們開始編碼實現(xiàn),我們使用JdbcTemplate進行查詢數(shù)據(jù)庫信息:

@Slf4j
@Component
public class TemplateHolder {
    private ParseCustomTemplate template;

    private final JdbcTemplate jdbcTemplate;

    private String SQL_SCHEMA = "SELECT TABLE_SCHEMA,TABLE_NAME,COLUMN_NAME,ORDINAL_POSITION FROM information_schema.COLUMNS " +
            "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?";

    @Autowired
    public TemplateHolder(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    /**
     * 需要在容器加載的時候,就載入數(shù)據(jù)信息
     */
    @PostConstruct
    private void init() {
        loadJSON("template.json");
    }

    /**
     * 對外提供加載服務(wù)
     */
    public TableTemplate getTable(String tableName) {
        return template.getTableTemplateMap().get(tableName);
    }

    /**
     * 加載需要監(jiān)聽的binlog json文件
     */
    private void loadJSON(String path) {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        InputStream inputStream = classLoader.getResourceAsStream(path);

        try {
            BinlogTemplate binlogTemplate = JSON.parseObject(
                    inputStream,
                    Charset.defaultCharset(),
                    BinlogTemplate.class
            );

            this.template = ParseCustomTemplate.parse(binlogTemplate);
            loadMeta();
        } catch (IOException ex) {
            log.error((ex.getMessage()));
            throw new RuntimeException("fail to parse json file");
        }
    }

    /**
     * 加載元信息
     * 使用表索引到列名稱的映射關(guān)系
     */
    private void loadMeta() {
        for (Map.Entry entry : template.getTableTemplateMap().entrySet()) {
            TableTemplate table = entry.getValue();

            List updateFields = table.getOpTypeFieldSetMap().get(
                    OperationTypeEnum.UPDATE
            );
            List insertFields = table.getOpTypeFieldSetMap().get(
                    OperationTypeEnum.ADD
            );
            List deleteFields = table.getOpTypeFieldSetMap().get(
                    OperationTypeEnum.DELETE
            );

            jdbcTemplate.query(SQL_SCHEMA, new Object[]{
                            template.getDatabase(), table.getTableName()
                    }, (rs, i) -> {
                        int pos = rs.getInt("ORDINAL_POSITION");
                        String colName = rs.getString("COLUMN_NAME");

                        if ((null != updateFields && updateFields.contains(colName))
                            || (null != insertFields && insertFields.contains(colName))
                            || (null != deleteFields && deleteFields.contains(colName))) {
                                     table.getPosMap().put(pos - 1, colName);
                        }
                        return null;
                    }
            );
        }
    }
}

監(jiān)聽binlog實現(xiàn)

定義Event 解析所需要轉(zhuǎn)換的java對象

@Data
public class BinlogRowData {

    private TableTemplate tableTemplate;

    private EventType eventType;

    private List> before;

    private List> after;

}

- 定義binlog client `BinaryLogClient`

```java
/**
 * CustomBinlogClient for 自定義Binlog Client
 *
 * @author Isaac.Zhang | 若初
 * @since 2019/6/27
 */
@Slf4j
@Component
public class CustomBinlogClient {

    private BinaryLogClient client;

    private final BinlogConfig config;
    private final AggregationListener listener;

    @Autowired
    public CustomBinlogClient(BinlogConfig config, AggregationListener listener) {
        this.config = config;
        this.listener = listener;
    }

    public void connect() {
        new Thread(() -> {
            client = new BinaryLogClient(
                    config.getHost(),
                    config.getPort(),
                    config.getUsername(),
                    config.getPassword()
            );

            if (!StringUtils.isEmpty(config.getBinlogName()) && !config.getPosition().equals(-1L)) {
                client.setBinlogFilename(config.getBinlogName());
                client.setBinlogPosition(config.getPosition());
            }

            try {
                log.info("connecting to mysql start...");
                client.connect();
                log.info("connecting to mysql done!");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }

    public void disconnect() {
        try {
            log.info("disconnect to mysql start...");
            client.disconnect();
            log.info("disconnect to mysql done!");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

使用client注冊事件監(jiān)聽器com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener

public interface Ilistener {

    void register();

    void onEvent(BinlogRowData eventData);
}

監(jiān)聽Binlog, 收集mysql binlog datas

@Slf4j
@Component
public class AggregationListener implements BinaryLogClient.EventListener {

    private String dbName;
    private String tbName;

    private Map listenerMap = new HashMap<>();

    @Autowired
    private TemplateHolder templateHolder;

    private String genKey(String dbName, String tbName) {
        return dbName + ":" + tbName;
    }

    /**
     * 根據(jù)表實現(xiàn)注冊信息
     */
    public void register(String dbName, String tbName, Ilistener listener) {
        log.info("register : {}-{}", dbName, tbName);
        this.listenerMap.put(genKey(dbName, tbName), listener);
    }

    @Override
    public void onEvent(Event event) {

        EventType type = event.getHeader().getEventType();
        log.info("Event type: {}", type);

        //數(shù)據(jù)庫增刪改之前,肯定有一個table_map event 的binlog
        if (type == EventType.TABLE_MAP) {
            TableMapEventData data = event.getData();
            this.tbName = data.getTable();
            this.dbName = data.getDatabase();
            return;
        }

        //EXT_UPDATE_ROWS 是Mysql 8以上的type
        if (type != EventType.EXT_UPDATE_ROWS
                && type != EventType.EXT_WRITE_ROWS
                && type != EventType.EXT_DELETE_ROWS
                ) {
            return;
        }

        // 檢查表名和數(shù)據(jù)庫名是否已經(jīng)正確填充
        if (StringUtils.isEmpty(dbName) || StringUtils.isEmpty(tbName)) {
            log.error("Meta data got error. tablename:{},database:{}", tbName, dbName);
            return;
        }

        //找出對應(yīng)數(shù)據(jù)表敏感的監(jiān)聽器
        String key = genKey(this.dbName, this.tbName);
        Ilistener ilistener = this.listenerMap.get(key);
        if (null == ilistener) {
            log.debug("skip {}", key);
        }

        log.info("trigger event:{}", type.name());

        try {
            BinlogRowData rowData = convertEventData2BinlogRowData(event.getData());
            if (null == rowData) {
                return;
            }
            rowData.setEventType(type);
            ilistener.onEvent(rowData);

        } catch (Exception e) {
            e.printStackTrace();
            log.error(e.getMessage());
        } finally {
            this.dbName = "";
            this.tbName = "";
        }
    }

    /**
     * 解析Binlog數(shù)據(jù)到Java實體對象的映射
     *
     * @param data binlog
     * @return java 對象
     */
    private BinlogRowData convertEventData2BinlogRowData(EventData data) {
        TableTemplate tableTemplate = templateHolder.getTable(tbName);
        if (null == tableTemplate) {
            log.warn("table {} not found.", tbName);
            return null;
        }

        List> afterMapList = new ArrayList<>();

        for (Serializable[] after : getAfterValues(data)) {
            Map afterMap = new HashMap<>();

            int columnLength = after.length;
            for (int i = 0; i < columnLength; ++i) {
                //取出當前位置對應(yīng)的列名
                String colName = tableTemplate.getPosMap().get(i);
                //如果沒有,則說明不需要該列
                if (null == colName) {
                    log.debug("ignore position: {}", i);
                    continue;
                }

                String colValue = after[i].toString();
                afterMap.put(colName, colValue);
            }

            afterMapList.add(afterMap);
        }

        BinlogRowData binlogRowData = new BinlogRowData();
        binlogRowData.setAfter(afterMapList);
        binlogRowData.setTableTemplate(tableTemplate);

        return binlogRowData;
    }

    /**
     * 獲取不同事件的變更后數(shù)據(jù)
     * Add & Delete變更前數(shù)據(jù)假定為空
     */
    private List getAfterValues(EventData eventData) {

        if (eventData instanceof WriteRowsEventData) {
            return ((WriteRowsEventData) eventData).getRows();
        }

        if (eventData instanceof UpdateRowsEventData) {
            return ((UpdateRowsEventData) eventData).getRows()
                                                    .stream()
                                                    .map(Map.Entry::getValue)
                                                    .collect(Collectors.toList()
                                                    );
        }

        if (eventData instanceof DeleteRowsEventData) {
            return ((DeleteRowsEventData) eventData).getRows();
        }

        return Collections.emptyList();
    }
}

解析binlog 數(shù)據(jù)對象BinlogRowData ,用于增量索引的后續(xù)處理

/**
 * MysqlRowData for 簡化{@link BinlogRowData} 以方便實現(xiàn)增量索引的實現(xiàn)
 *
 * @author Isaac.Zhang | 若初
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MysqlRowData {

    //實現(xiàn)多數(shù)據(jù)的時候,需要傳遞數(shù)據(jù)庫名稱
    //private String database;
    private String tableName;
    private String level;
    private OperationTypeEnum operationTypeEnum;
    private List> fieldValueMap = new ArrayList<>();
}

因為我們需要將Binlog EventType轉(zhuǎn)換為我們的操作類型OperationTypeEnum,所以,我們在OperationTypeEnum中添加一個轉(zhuǎn)換方法:

    public enum OperationTypeEnum {
    ...
        public static OperationTypeEnum convert(EventType type) {
            switch (type) {
                case EXT_WRITE_ROWS:
                    return ADD;
                case EXT_UPDATE_ROWS:
                    return UPDATE;
                case EXT_DELETE_ROWS:
                    return DELETE;
                default:
                    return OTHER;
            }
        }
    }

我們還需要定義一個表包含的各個列名稱的java類,方便我們后期對數(shù)據(jù)表的CUD操作:

package com.sxzhongf.ad.mysql.constant;

import java.util.HashMap;
import java.util.Map;

public class Constant {

    private static final String DATABASE_NAME = "advertisement";

    public static class AD_PLAN_TABLE_INFO {

        public static final String TABLE_NAME = "ad_plan";

        public static final String COLUMN_PLAN_ID = "plan_id";
        public static final String COLUMN_USER_ID = "user_id";
        public static final String COLUMN_PLAN_STATUS = "plan_status";
        public static final String COLUMN_START_DATE = "start_date";
        public static final String COLUMN_END_DATE = "end_date";
    }

    public static class AD_CREATIVE_TABLE_INFO {

        public static final String TABLE_NAME = "ad_creative";

        public static final String COLUMN_CREATIVE_ID = "creative_id";
        public static final String COLUMN_TYPE = "type";
        public static final String COLUMN_MATERIAL_TYPE = "material_type";
        public static final String COLUMN_HEIGHT = "height";
        public static final String COLUMN_WIDTH = "width";
        public static final String COLUMN_AUDIT_STATUS = "audit_status";
        public static final String COLUMN_URL = "url";
    }

    public static class AD_UNIT_TABLE_INFO {

        public static final String TABLE_NAME = "ad_unit";

        public static final String COLUMN_UNIT_ID = "unit_id";
        public static final String COLUMN_UNIT_STATUS = "unit_status";
        public static final String COLUNN_POSITION_TYPE = "position_type";
        public static final String COLUNN_PLAN_ID = "plan_id";
    }

    public static class RELATIONSHIP_CREATIVE_UNIT_TABLE_INFO {

        public static final String TABLE_NAME = "relationship_creative_unit";

        public static final String COLUMN_CREATIVE_ID = "creative_id";
        public static final String COLUMN_UNIT_ID = "unit_id";
    }

    public static class AD_UNIT_DISTRICT_TABLE_INFO {

        public static final String TABLE_NAME = "ad_unit_district";

        public static final String COLUMN_UNIT_ID = "unit_id";
        public static final String COLUMN_PROVINCE = "province";
        public static final String COLUMN_CITY = "city";
    }

    public static class AD_UNIT_KEYWORD_TABLE_INFO {

        public static final String TABLE_NAME = "ad_unit_keyword";

        public static final String COLUMN_UNIT_ID = "unit_id";
        public static final String COLUMN_KEYWORD = "keyword";
    }

    public static class AD_UNIT_HOBBY_TABLE_INFO {

        public static final String TABLE_NAME = "ad_unit_hobby";

        public static final String COLUMN_UNIT_ID = "unit_id";
        public static final String COLUMN_HOBBY_TAG = "hobby_tag";
    }

    //key -> 表名
    //value -> 數(shù)據(jù)庫名
    public static Map table2db;

    static {
        table2db = new HashMap<>();
        table2db.put(AD_PLAN_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
        table2db.put(AD_CREATIVE_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
        table2db.put(AD_UNIT_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
        table2db.put(RELATIONSHIP_CREATIVE_UNIT_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
        table2db.put(AD_UNIT_DISTRICT_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
        table2db.put(AD_UNIT_HOBBY_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
        table2db.put(AD_UNIT_KEYWORD_TABLE_INFO.TABLE_NAME, DATABASE_NAME);
    }
}

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/75996.html

相關(guān)文章

  • [Spring cloud 步步實現(xiàn)廣告系統(tǒng)] 12. 廣告索引介紹

    摘要:索引設(shè)計介紹在我們廣告系統(tǒng)中,為了我們能更快的拿到我們想要的廣告數(shù)據(jù),我們需要對廣告數(shù)據(jù)添加類似于數(shù)據(jù)庫一樣的索引結(jié)構(gòu),分兩大類正向索引和倒排索引。如何在廣告系統(tǒng)中使用倒排索引核心用途是對各個維度限制的整理。 索引設(shè)計介紹 在我們廣告系統(tǒng)中,為了我們能更快的拿到我們想要的廣告數(shù)據(jù),我們需要對廣告數(shù)據(jù)添加類似于數(shù)據(jù)庫index一樣的索引結(jié)構(gòu),分兩大類:正向索引和倒排索引。 正向索引 通過...

    endless_road 評論0 收藏0
  • [Spring cloud 步步實現(xiàn)廣告系統(tǒng)] 14. 全量索引代碼實現(xiàn)

    摘要:各個表數(shù)據(jù)的存儲文件名定義索引對象導出的字段信息依然用為例。通用處理索引類索引之間存在層級劃分,也就是相互之間擁有依賴關(guān)系的劃分加載全量索引其實是增量索引添加的一種特殊實現(xiàn)若初實現(xiàn)廣告推廣計劃的第二層級索引實現(xiàn)。 上一節(jié)我們實現(xiàn)了索引基本操作的類以及索引緩存工具類,本小節(jié)我們開始實現(xiàn)加載全量索引數(shù)據(jù),在加載全量索引數(shù)據(jù)之前,我們需要先將數(shù)據(jù)庫中的表數(shù)據(jù)導出到一份文件中。Lets cod...

    MycLambert 評論0 收藏0
  • [Spring cloud 步步實現(xiàn)廣告系統(tǒng)] 13. 索引服務(wù)編碼實現(xiàn)

    摘要:上一節(jié)我們分析了廣告索引的維護有種,全量索引加載和增量索引維護。因為廣告檢索是廣告系統(tǒng)中最為重要的環(huán)節(jié),大家一定要認真理解我們索引設(shè)計的思路,接下來我們來編碼實現(xiàn)索引維護功能。 上一節(jié)我們分析了廣告索引的維護有2種,全量索引加載和增量索引維護。因為廣告檢索是廣告系統(tǒng)中最為重要的環(huán)節(jié),大家一定要認真理解我們索引設(shè)計的思路,接下來我們來編碼實現(xiàn)索引維護功能。 我們來定義一個接口,來接收所有...

    stefanieliang 評論0 收藏0
  • [Spring cloud 步步實現(xiàn)廣告系統(tǒng)] 8. 檢索系統(tǒng)配置&依賴

    摘要:工作流程項目依賴監(jiān)控面板引入服務(wù)調(diào)用的組件依賴引入服務(wù)消費者的依賴數(shù)據(jù)庫鏈接依賴工具類集合類操作日志監(jiān)聽解析開源工具類庫中的配置相關(guān)依賴圖片壓縮 工作流程 showImg(https://i.loli.net/2019/07/29/5d3ee1829df4d57461.png); 項目依賴 org.springframewo...

    dailybird 評論0 收藏0
  • [Spring cloud 步步實現(xiàn)廣告系統(tǒng)] 7. 中期總結(jié)回顧

    摘要:在前面的過程中,我們創(chuàng)建了個服務(wù)發(fā)現(xiàn)我們使用作為服務(wù)發(fā)現(xiàn)組件,學習了的使用。加依賴加注解改配置使用項目三部曲,我們可以快速添加一個新組件,并正常使用這個我沒有在項目中實現(xiàn),但是大家可以和一樣,三部曲搞定。 在前面的過程中,我們創(chuàng)建了4個project: 服務(wù)發(fā)現(xiàn) 我們使用Eureka 作為服務(wù)發(fā)現(xiàn)組件,學習了Eureka Server,Eureka Client的使用。 Eureka...

    cnsworder 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<