• 當前位置:首頁 > IT技術 > 數據庫 > 正文

    Flink CDC同步MySQL分庫分表數據到Iceberg數據湖實踐
    2022-04-25 23:09:35

    介紹

    Flink CDC: 捕獲數據庫完整的變更日志記錄增、刪、改等所有數據. Flink在1.11版本開始引入了Flink CDC功能,并且同時支持Table & SQL兩種形式。Flink SQL CDC是以SQL的形式編寫實時任務,并對CDC數據進行實時解析同步。相比于傳統的數據同步方案,該方案在實時性、易用性等方面有了極大的改善。

    Flink CDC 同步優勢:

    • 業務解耦:無需入侵業務,和業務完全解耦,也就是業務端無感知數據同步的存在。

    • 性能消耗:業務數據庫性能消耗小,數據同步延遲低。

    • 同步易用:使用SQL方式執行CDC同步任務,極大的降低使用維護門檻。

    • 數據完整:完整的數據庫變更記錄,不會丟失任何記錄,Flink 自身支持 Exactly Once。

    數據湖: 支持存儲多種原始數據格式、多種計算引擎、高效的元數據統一管理和海量統一數據存儲。

    Apache Iceberg: 是一個大規模數據分析的開放表格式, 是數據湖的一種解決方案.

    Iceberg 設計特點:

    • ACID:不會讀到不完整的commit數據,基于樂觀鎖實現,支持并發commit,支持Row-level delete,支持upsert操作。
    • 增量快照:Commit后的數據即可見,在Flink實時入湖場景下,數據可見根據checkpoint的時間間隔來確定的,增量形式也可回溯歷史快照。
    • 開放的表格式:對于一個真正的開放表格式,支持多種數據存儲格式,如:parquet、orc、avro等,支持多種計算引擎,如:Spark、Flink、Hive、Trino/Presto。
    • 流批接口支持:支持流式寫入、批量寫入,支持流式讀取、批量讀取

    環境準備

    準備Flink 、mysql docker鏡像 測試環境:

    docker-compose.yml:

    version: '2.1'
    services:
      sql-client:
        user: flink:flink
        image: yuxialuo/flink-sql-client:1.13.2.v1 
        depends_on:
          - jobmanager
          - mysql
        environment:
          FLINK_JOBMANAGER_HOST: jobmanager
          MYSQL_HOST: mysql
        volumes:
          - shared-tmpfs:/tmp/iceberg
      jobmanager:
        user: flink:flink
        image: flink:1.13.2-scala_2.11
        ports:
          - "8081:8081"
        command: jobmanager
        environment:
          - |
            FLINK_PROPERTIES=
            jobmanager.rpc.address: jobmanager
        volumes:
          - shared-tmpfs:/tmp/iceberg
      taskmanager:
        user: flink:flink
        image: flink:1.13.2-scala_2.11
        depends_on:
          - jobmanager
        command: taskmanager
        environment:
          - |
            FLINK_PROPERTIES=
            jobmanager.rpc.address: jobmanager
            taskmanager.numberOfTaskSlots: 2
        volumes:
          - shared-tmpfs:/tmp/iceberg
      mysql:
        image: debezium/example-mysql:1.1
        ports:
          - "3306:3306"
        environment:
          - MYSQL_ROOT_PASSWORD=123456
          - MYSQL_USER=mysqluser
          - MYSQL_PASSWORD=mysqlpw
    
    volumes:
      shared-tmpfs:
        driver: local
        driver_opts:
          type: "tmpfs"
          device: "tmpfs"
    

    在docker-compose.yml文件同目錄下啟動flink 組件:

    docker-compose up -d

    該命令將以 detached 模式自動啟動 Docker Compose 配置中定義的所有容器。

    可以通過訪問 http://localhost:8081/ 來查看 Flink 是否運行正常

    本教程需要的 jar 包都已經被打包進 SQL-Client 容器中了,

    如果你想要在自己的 Flink 環境運行本教程,需要下載下面列出的包并且把它們放在 Flink 所在目錄的 lib 目錄下,即 FLINK_HOME/lib/。

    flink-sql-connector-mysql-cdc-2.1.0.jar
    flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
    iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar
    

    當 Iceberg 0.13.0 版本發布后,你也可以在 apache official repository 下載到支持 Flink 1.13 的 iceberg-flink-runtime jar 包。

    https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/?spm=a2c6h.12873639.article-detail.13.1c396ec7vRoZE8

    準備測試數據

    進入 MySQL 容器中:

    docker-compose exec mysql mysql -uroot -p123456
    

    創建數據和表,并填充數據:

    創建兩個不同的數據庫,并在每個數據庫中創建兩個表,作為 user 表分庫分表下拆分出的表。

     CREATE DATABASE db_1;
     USE db_1;
     CREATE TABLE user_1 (
       id INTEGER NOT NULL PRIMARY KEY,
       name VARCHAR(255) NOT NULL DEFAULT 'flink',
       address VARCHAR(1024),
       phone_number VARCHAR(512),
       email VARCHAR(255)
     );
     INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234","user_110@foo.com");
    
     CREATE TABLE user_2 (
       id INTEGER NOT NULL PRIMARY KEY,
       name VARCHAR(255) NOT NULL DEFAULT 'flink',
       address VARCHAR(1024),
       phone_number VARCHAR(512),
       email VARCHAR(255)
     );
    INSERT INTO user_2 VALUES (120,"user_120","Shanghai","123567891234","user_120@foo.com");
    
    
    CREATE DATABASE db_2;
    USE db_2;
    CREATE TABLE user_1 (
      id INTEGER NOT NULL PRIMARY KEY,
      name VARCHAR(255) NOT NULL DEFAULT 'flink',
      address VARCHAR(1024),
      phone_number VARCHAR(512),
      email VARCHAR(255)
    );
    INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234", NULL);
    
    CREATE TABLE user_2 (
      id INTEGER NOT NULL PRIMARY KEY,
      name VARCHAR(255) NOT NULL DEFAULT 'flink',
      address VARCHAR(1024),
      phone_number VARCHAR(512),
      email VARCHAR(255)
    );
    INSERT INTO user_2 VALUES (220,"user_220","Shanghai","123567891234","user_220@foo.com");
    

    首先,使用如下的命令進入 Flink SQL CLI 容器中:

    docker-compose exec sql-client ./sql-client
    

    LIadHO.md.png

    開啟 checkpoint

    Checkpoint 默認是不開啟的,我們需要開啟 Checkpoint 來讓 Iceberg 可以提交事務。
    并且,mysql-cdc 在 binlog 讀取階段開始前,需要等待一個完整的 checkpoint 來避免 binlog 記錄亂序的情況。

    -- Flink SQL
    -- 每隔 3 秒做一次 checkpoint

    Flink SQL> SET execution.checkpointing.interval = 3s;
    

    創建 MySQL 分庫分表 source 表

    創建 source 表 user_source 來捕獲MySQL中所有 user 表的數據,在表的配置項 database-name , table-name 使用正則表達式來匹配這些表。
    并且,user_source 表也定義了 metadata 列來區分數據是來自哪個數據庫和表。

    CREATE TABLE user_source (
        database_name STRING METADATA VIRTUAL,
        table_name STRING METADATA VIRTUAL,
        `id` DECIMAL(20, 0) NOT NULL,
        name STRING,
        address STRING,
        phone_number STRING,
        email STRING,
        PRIMARY KEY (`id`) NOT ENFORCED
      ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = 'mysql',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'db_[0-9]+',
        'table-name' = 'user_[0-9]+'
      );
    

    創建 Iceberg sink 表

    創建 sink 表 all_users_sink,用來將數據加載至 Iceberg 中。
    在這個 sink 表,考慮到不同的 MySQL 數據庫表的 id 字段的值可能相同,我們定義了復合主鍵 (database_name, table_name, id)。

    CREATE TABLE all_users_sink (
        database_name STRING,
        table_name    STRING,
        `id`          DECIMAL(20, 0) NOT NULL,
        name          STRING,
        address       STRING,
        phone_number  STRING,
        email         STRING,
        PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
      ) WITH (
        'connector'='iceberg',
        'catalog-name'='iceberg_catalog',
        'catalog-type'='hadoop',  
        'warehouse'='file:///tmp/iceberg/warehouse',
        'format-version'='2'
      );
    

    流式寫入 Iceberg

    使用下面的 Flink SQL 語句將數據從 MySQL 寫入 Iceberg 中:

    INSERT INTO all_users_sink select * from user_source;
    

    述命令將會啟動一個流式作業,源源不斷將 MySQL 數據庫中的全量和增量數據同步到 Iceberg 中。

    LIIOpR.md.png

    然后我們就可以使用如下的命令看到 Iceberg 中的寫入的文件:

    docker-compose exec sql-client tree /tmp/iceberg/warehouse/default_database/

    /tmp/iceberg/warehouse/default_database/
    └── all_users_sink
        ├── data
        │   ├── 00000-0-84b83e87-0e98-48da-8871-4de54d802dc5-00009.parquet
        │   ├── 00000-0-84b83e87-0e98-48da-8871-4de54d802dc5-00011.parquet
        │   ├── 00000-0-84b83e87-0e98-48da-8871-4de54d802dc5-00511.parquet
        │   └── 00000-0-84b83e87-0e98-48da-8871-4de54d802dc5-00512.parquet
        └── metadata
            ├── 6785c966-67e3-43e0-876d-cfc2b77424b4-m0.avro
            ├── c4f04e0f-5f1d-4cd3-a5eb-4f423390011d-m0.avro
            ├── snap-1060385011870418792-1-df87d81d-004f-44d6-acca-1c77e5383647.avro
            ├── snap-1125901484026564419-1-c8e6142a-4702-4bf9-bb6c-937261910d39.avro
            ├── snap-1465929231731371144-1-cd480baf-a496-4f69-bb11-379299782e7a.avro
            ├── snap-1535675730396165219-1-eddfe40e-27bd-4a7a-97b0-191da77d4019.avro
            ├── snap-2621077481890393128-1-fdb33dc2-97a9-4472-bda4-fe0192a983c4.avro
            ├── snap-2886091127939856900-1-94d854db-2081-43b4-9bb3-11f9d0377503.avro
            ├── snap-3343920335928350948-1-19669bbb-7b82-4218-83ea-05c90429ff01.avro
            ├── snap-3566691522613506207-1-59e74ad7-a32e-427f-83c1-640d98b58d24.avro
            ├── snap-3843624394887137001-1-fef2b9b7-b7de-4ece-951b-eb1856a2d195.avro
            ├── snap-4100501778549948477-1-6785c966-67e3-43e0-876d-cfc2b77424b4.avro
            ├── snap-4248879694079296194-1-441e1ce8-6a10-4ebc-82b4-7abf62bc385b.avro
            ├── snap-445137311357959788-1-4e97b44e-a626-402b-b6ca-613e5252ed15.avro
            ├── snap-4453685821727449894-1-a5d3ced5-9d98-419a-aeda-a89e0184aa91.avro
            ├── snap-4652826435458483424-1-144e1141-8da3-450d-ba4d-01858befea48.avro
            ├── snap-4827514150229893384-1-db19f736-209b-44b0-9a4a-a1ecb8532817.avro
            ├── snap-5160869656962357717-1-522bdf2b-fd9d-4c81-9995-6c598e3112a2.avro
            ├── snap-5328679998683573777-1-befea0d5-0312-41db-ab33-04d2f71aa29c.avro
            ├── snap-5468995844667874005-1-4c1db744-6eb6-4c62-a5ce-6162b64ed429.avro
            ├── snap-7392671775005889691-1-f0e79868-ae06-4fe8-9a8e-e0b9f2fe2c12.avro
            ├── snap-7448354638185933171-1-621e2364-508e-47bf-83d0-5c7d72d160c6.avro
            ├── snap-7449633500954413534-1-3c673f73-381e-4917-af09-ce06e75995ee.avro
            ├── snap-7808424372668354882-1-a874a13c-32cc-4b4b-ab45-3042cad872f8.avro
            ├── snap-8487607088527724113-1-86dbb914-c564-4841-a536-be834a09b09d.avro
            ├── snap-882048647352933559-1-c7d1058c-1d60-4624-b592-2d8c9f208946.avro
            ├── snap-9092189266221057431-1-c4f04e0f-5f1d-4cd3-a5eb-4f423390011d.avro
            ├── snap-9149158390097592825-1-fd9e8dd3-519c-4b48-b78c-181ea0fd2aaf.avro
            ├── v1.metadata.json
            ├── v10.metadata.json
            ├── v11.metadata.json
            ├── v12.metadata.json
            ├── v13.metadata.json
            ├── v14.metadata.json
            ├── v15.metadata.json
            ├── v16.metadata.json
            ├── v17.metadata.json
            ├── v18.metadata.json
            ├── v19.metadata.json
            ├── v2.metadata.json
            ├── v20.metadata.json
            ├── v21.metadata.json
            ├── v22.metadata.json
            ├── v23.metadata.json
            ├── v24.metadata.json
            ├── v25.metadata.json
            ├── v26.metadata.json
            ├── v27.metadata.json
            ├── v3.metadata.json
            ├── v4.metadata.json
            ├── v5.metadata.json
            ├── v6.metadata.json
            ├── v7.metadata.json
            ├── v8.metadata.json
            ├── v9.metadata.json
            └── version-hint.text
    
    

    使用下面的 Flink SQL 語句查詢表 all_users_sink 中的數據:

    LIoPtH.md.png

    修改 MySQL 中表的數據,Iceberg 中的表 all_users_sink 中的數據也將實時更新:

    (3.1) 在 db_1.user_1 表中插入新的一行

    --- db_1
    INSERT INTO db_1.user_1 VALUES (111,"user_111","Shanghai","123567891234","user_111@foo.com");
    

    (3.2) 更新 db_1.user_2 表的數據

    --- db_1
    UPDATE db_1.user_2 SET address='Beijing' WHERE id=120;
    

    (3.3) 在 db_2.user_2 表中刪除一行

    --- db_2
    DELETE FROM db_2.user_2 WHERE id=220;
    

    每執行一步,在 Flink Client CLI 中使用 SELECT * FROM all_users_sink 查詢表 all_users_sink 來看到數據的變化。

    LIoA1I.md.png

    從 Iceberg 的最新結果中可以看到新增了(db_1, user_1, 111)的記錄,(db_1, user_2, 120)的地址更新成了 Beijing,且(db_2, user_2, 220)的記錄被刪除了,與我們在 MySQL 做的數據更新完全一致。

    最后, 關閉所有容器:

    docker-compose down
    

    接下來,將調研如何將Iceberg 與Hive、SparkSQL 整合,讀取和分析Flink CDC寫入Iceberg中的數據.

    參考

    1. Iceberg 實踐 | 基于 Flink CDC 打通數據實時入湖:https://jishuin.proginn.com/p/763bfbd5bdbe
    2. Flink CDC 系列 - 同步 MySQL 分庫分表,構建 Iceberg 實時數據湖:https://developer.aliyun.com/article/841222

    本文摘自 :https://www.cnblogs.com/

    開通會員,享受整站包年服務
    国产呦精品一区二区三区网站|久久www免费人咸|精品无码人妻一区二区|久99久热只有精品国产15|中文字幕亚洲无线码