为了能够快速的验证整套流程的功能性,所有的组件都以单机的形式支配。如果手上的物理资源不敷,可以将本文中的所有组件一台 4G 1U 的虚拟机环境中。
如果须要在生产环境中支配,建议将每一个组件更换成高可用的集群支配方案。
个中,我们单独创建了一套 Zookeeper 单节点环境,Flink、Kafka、Canal 等组件共用这个 Zookeeper 环境。

针对付所有须要 JRE 的组件,如 Flink,Kafka,Canal,Zookeeper,考虑到升级 JRE 可能会影响到其他的运用,我们选择每个组件独立利用自己的 JRE 环境。
本文分为两个部分,个中,前七小节紧张先容根本环境的搭建,末了一个小节先容了数据是如何在各个组件中流利的。
数据的流动经由以下组件:
MySQL 数据源天生 Binlog。Canal 读取 Binlog,天生 Canal json,推送到 Kafka 指定的 Topic 中。Flink 利用 flink-sql-connector-kafka API,消费 Kafka Topic 中的数据。Flink 在通过 flink-connector-jdbc,将数据写入到 TiDB 中。TiDB + Flink 的构造,支持开拓与运行多种不同种类的运用程序。
目前紧张的特性紧张包括:
批流一体化。精密的状态管理。事宜韶光支持。精确的一次状态同等性保障。Flink 可以运行在包括 YARN、Mesos、Kubernetes 在内的多种资源管理框架上,还支持裸机集群上独立支配。TiDB 可以支配 AWS、Kubernetes、GCP GKE 上,同时也支持利用 TiUP 在裸机集群上独立支配。
TiDB + Flink 构造常见的几类运用如下:
事宜驱动型运用:反敲诈。非常检测。基于规则的报警。业务流程监控。数据剖析运用:网络质量监控。产品更新及试验评估剖析。事实数据即席剖析。大规模图剖析。数据管道运用:电商实时查询索引构建。电商持续 ETL。二. 环境先容2.1 操作系统环境
[root@r20 topology]# cat /etc/redhat-releaseCentOS Stream release 8
2.2 软件环境
Item
Version
Download link
TiDB
v4.0.9
https://download.pingcap.org/tidb-community-server-v4.0.9-linux-amd64.tar.gz 1
Kafka
v2.7.0
https://mirrors.bfsu.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
Flink
v1.12.1
https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.12.1/flink-1.12.1-bin-scala_2.11.tgz
Jre
v1.8.0_281
https://javadl.oracle.com/webapps/download/AutoDL?BundleId=244058_89d678f2be164786b292527658ca1605
Zookeeper
v3.6.2
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
flink-sql-connector-kafka
v1.12.1
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.12.0/flink-sql-connector-kafka_2.12-1.12.0.jar
flink-connector-jdbc
v1.12.0
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.12.0/flink-connector-jdbc_2.12-1.12.0.jar
MySQL
v8.0.23
https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz
Canal
v1.1.4
https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
2.3 机器分配
Hostname
IP
Component
r21
192.168.12.21
TiDB Cluster
r22
192.168.12.22
Kafka
r23
192.168.12.23
Flink
r24
192.168.12.24
Zookeeper
r25
192.168.12.25
MySQL
r26
192.168.12.26
Canal
三. 支配 TiDB Cluster与传统的单机数据库比较,TiDB 具有以下上风:
纯分布式架构,拥有良好的扩展性,支持弹性的扩缩容。支持 SQL,对外暴露 MySQL 的网络协议,并兼容大多数 MySQL 的语法,在大多数场景下可以直接更换 MySQL。默认支持高可用,在少数副本失落效的情形下,数据库本身能够自动进行数据修复和故障转移,对业务透明。支持 ACID 事务,对付一些有强同等需求的场景友好,例如:银行转账。具有丰富的工具链生态,覆盖数据迁移、同步、备份等多种场景。在内核设计上,TiDB 分布式数据库将整体架构拆分成了多个模块,各模块之间相互通信,组成完全的 TiDB 系统。对应的架构图如下:
在本文中,我们只做最大略的功能测试,以是支配了一套单节点但副本的 TiDB,涉及到了以下的三个模块:
TiDB Server:SQL 层,对外暴露 MySQL 协议的连接 endpoint,卖力接管客户真个连接,实行 SQL 解析和优化,最终生成分布式实行操持。PD (Placement Driver) Server:全体 TiDB 集群的元信息管理模块,卖力存储每个 TiKV 节点实时的数据分布情形和集群的整体拓扑构造,供应 TiDB Dashboard 管控界面,并为分布式事务分配事务 ID。TiKV Server:卖力存储数据,从外部看 TiKV 是一个分布式的供应事务的 Key-Value 存储引擎。3.1 TiUP 支配模板文件
# # Global variables are applied to all deployments and used as the default value of# # the deployments if a specific deployment value is missing.global: user: "tidb" ssh_port: 22 deploy_dir: "/opt/tidb-c1/" data_dir: "/opt/tidb-c1/data/"# # Monitored variables are applied to all the machines.#monitored:# node_exporter_port: 19100# blackbox_exporter_port: 39115# deploy_dir: "/opt/tidb-c3/monitored"# data_dir: "/opt/tidb-c3/data/monitored"# log_dir: "/opt/tidb-c3/log/monitored"# # Server configs are used to specify the runtime configuration of TiDB components.# # All configuration items can be found in TiDB docs:# # - TiDB: https://pingcap.com/docs/stable/reference/configuration/tidb-server/configuration-file/# # - TiKV: https://pingcap.com/docs/stable/reference/configuration/tikv-server/configuration-file/# # - PD: https://pingcap.com/docs/stable/reference/configuration/pd-server/configuration-file/# # All configuration items use points to represent the hierarchy, e.g:# # readpool.storage.use-unified-pool# ## # You can overwrite this configuration via the instance-level `config` field.server_configs: tidb: log.slow-threshold: 300 binlog.enable: false binlog.ignore-error: false tikv-client.copr-cache.enable: true tikv: server.grpc-concurrency: 4 raftstore.apply-pool-size: 2 raftstore.store-pool-size: 2 rocksdb.max-sub-compactions: 1 storage.block-cache.capacity: "16GB" readpool.unified.max-thread-count: 12 readpool.storage.use-unified-pool: false readpool.coprocessor.use-unified-pool: true raftdb.rate-bytes-per-sec: 0 pd: schedule.leader-schedule-limit: 4 schedule.region-schedule-limit: 2048 schedule.replica-schedule-limit: 64pd_servers: - host: 192.168.12.21 ssh_port: 22 name: "pd-2" client_port: 12379 peer_port: 12380 deploy_dir: "/opt/tidb-c1/pd-12379" data_dir: "/opt/tidb-c1/data/pd-12379" log_dir: "/opt/tidb-c1/log/pd-12379" numa_node: "0" # # The following configs are used to overwrite the `server_configs.pd` values. config: schedule.max-merge-region-size: 20 schedule.max-merge-region-keys: 200000tidb_servers: - host: 192.168.12.21 ssh_port: 22 port: 14000 status_port: 12080 deploy_dir: "/opt/tidb-c1/tidb-14000" log_dir: "/opt/tidb-c1/log/tidb-14000" numa_node: "0" # # The following configs are used to overwrite the `server_configs.tidb` values. config: log.slow-query-file: tidb-slow-overwrited.log tikv-client.copr-cache.enable: truetikv_servers: - host: 192.168.12.21 ssh_port: 22 port: 12160 status_port: 12180 deploy_dir: "/opt/tidb-c1/tikv-12160" data_dir: "/opt/tidb-c1/data/tikv-12160" log_dir: "/opt/tidb-c1/log/tikv-12160" numa_node: "0" # # The following configs are used to overwrite the `server_configs.tikv` values. config: server.grpc-concurrency: 4 #server.labels: { zone: "zone1", dc: "dc1", host: "host1" }#monitoring_servers:# - host: 192.168.12.21# ssh_port: 22# port: 19090# deploy_dir: "/opt/tidb-c1/prometheus-19090"# data_dir: "/opt/tidb-c1/data/prometheus-19090"# log_dir: "/opt/tidb-c1/log/prometheus-19090"#grafana_servers:# - host: 192.168.12.21# port: 13000# deploy_dir: "/opt/tidb-c1/grafana-13000"#alertmanager_servers:# - host: 192.168.12.21# ssh_port: 22# web_port: 19093# cluster_port: 19094# deploy_dir: "/opt/tidb-c1/alertmanager-19093"# data_dir: "/opt/tidb-c1/data/alertmanager-19093"# log_dir: "/opt/tidb-c1/log/alertmanager-19093"
3.2 TiDB Cluster 环境
本文重点非支配 TiDB Cluster,作为快速实验环境,只在一台机器上支配单副本的 TiDB Cluster 集群。不须要支配监控环境。
[root@r20 topology]# tiup cluster display tidb-c1-v409Starting component `cluster`: /root/.tiup/components/cluster/v1.3.2/tiup-cluster display tidb-c1-v409Cluster type: tidbCluster name: tidb-c1-v409Cluster version: v4.0.9SSH type: builtinDashboard URL: http://192.168.12.21:12379/dashboardID Role Host Ports OS/Arch Status Data Dir Deploy Dir-- ---- ---- ----- ------- ------ -------- ----------192.168.12.21:12379 pd 192.168.12.21 12379/12380 linux/x86_64 Up|L|UI /opt/tidb-c1/data/pd-12379 /opt/tidb-c1/pd-12379192.168.12.21:14000 tidb 192.168.12.21 14000/12080 linux/x86_64 Up - /opt/tidb-c1/tidb-14000192.168.12.21:12160 tikv 192.168.12.21 12160/12180 linux/x86_64 Up /opt/tidb-c1/data/tikv-12160 /opt/tidb-c1/tikv-12160Total nodes: 4
创建用于测试的表
mysql> show create table t1;+-------+-------------------------------------------------------------------------------------------------------------------------------+| Table | Create Table |+-------+-------------------------------------------------------------------------------------------------------------------------------+| t1 | CREATE TABLE `t1` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin |+-------+-------------------------------------------------------------------------------------------------------------------------------+1 row in set (0.00 sec)
四. 支配 Zookeeper 环境
在本实验中单独配置 Zookeeper 环境,为 Kafka 和 Flink 环境供应做事。
作为实验演示方案,只支配单机环境。
4.1 解压 Zookeeper 包
[root@r24 soft]# tar vxzf apache-zookeeper-3.6.2-bin.tar.gz[root@r24 soft]# mv apache-zookeeper-3.6.2-bin /opt/zookeeper
4.2 支配用于 Zookeeper 的 jre
[root@r24 soft]# tar vxzf jre1.8.0_281.tar.gz[root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre
修正 /opt/zookeeper/bin/zkEnv.sh 文件,增加 JAVA_HOME 环境变量
## add bellowing env var in the head of zkEnv.shJAVA_HOME=/opt/zookeeper/jre
4.3 创建 Zookeeper 的配置文件
[root@r24 conf]# cat zoo.cfg | grep -v "#"tickTime=2000initLimit=10syncLimit=5dataDir=/opt/zookeeper/dataclientPort=2181
4.4 启动 Zookeeper
[root@r24 bin]# /opt/zookeeper/bin/zkServer.sh start
4.5 检讨 Zookeeper 的状态
## check zk status[root@r24 bin]# ./zkServer.sh statusZooKeeper JMX enabled by defaultUsing config: /opt/zookeeper/bin/../conf/zoo.cfgClient port found: 2181. Client address: localhost. Client SSL: false.Mode: standalone## check OS port status[root@r24 bin]# netstat -ntlpActive Internet connections (only servers)Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program nametcp 0 0 0.0.0.0:22 0.0.0.0: LISTEN 942/sshdtcp6 0 0 :::2181 ::: LISTEN 15062/javatcp6 0 0 :::8080 ::: LISTEN 15062/javatcp6 0 0 :::22 ::: LISTEN 942/sshdtcp6 0 0 :::44505 ::: LISTEN 15062/java## use zkCli tool to check zk connection[root@r24 bin]# ./zkCli.sh -server 192.168.12.24:2181
4.6 关于 Zookeeper 的建议
我个人有一个关于 Zookeeper 的不成熟的小建议:
Zookeeper 集群版本一定要开启网络监控。特殊是要关注 system metrics 里面的 network bandwidth。
五. 支配 KafkaKafka 是一个分布式流处理平台,紧张运用于两大类的运用中:
布局实时流数据管道,它可以在系统或运用之间可靠地获取数据。 (相称于message queue)构建实时流式运用程序,对这些流数据进行转换或者影响。 (便是流处理,通过kafka stream topic和topic之间内部进行变革)Kafka 有四个核心的 API:
The Producer API 许可一个运用程序发布一串流式的数据到一个或者多个Kafka topic。The Consumer API 许可一个运用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。The Streams API 许可一个运用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。The Connector API 许可构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的运用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。在本实验中只做功能性验证,只搭建一个单机版的 Kafka 环境。
5.1 ***并解压 Kafka
[root@r22 soft]# tar vxzf kafka_2.13-2.7.0.tgz[root@r22 soft]# mv kafka_2.13-2.7.0 /opt/kafka
5.2 支配用于 Kafka 的 jre
[root@r22 soft]# tar vxzf jre1.8.0_281.tar.gz[root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre
修正 Kafka 的 jre 环境变量
[root@r22 bin]# vim /opt/kafka/bin/kafka-run-class.sh## add bellowing line in the head of kafka-run-class.shJAVA_HOME=/opt/kafka/jre
5.3 修正 Kafka 配置文件
修正 Kafka 配置文件 /opt/kafka/config/server.properties
## change bellowing variable in /opt/kafka/config/server.propertiesbroker.id=0listeners=PLAINTEXT://192.168.12.22:9092log.dirs=/opt/kafka/logszookeeper.connect=i192.168.12.24:2181
5.4 启动 Kafka
[root@r22 bin]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
5.5 查看 Kafka 的版本信息
Kafka 并没有供应 --version 的 optional 来查看 Kafka 的版本信息。
[root@r22 ~]# ll /opt/kafka/libs/ | grep kafka-rw-r--r-- 1 root root 4929521 Dec 16 09:02 kafka_2.13-2.7.0.jar-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0.jar.asc-rw-r--r-- 1 root root 41793 Dec 16 09:02 kafka_2.13-2.7.0-javadoc.jar-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0-javadoc.jar.asc-rw-r--r-- 1 root root 892036 Dec 16 09:02 kafka_2.13-2.7.0-sources.jar-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0-sources.jar.asc... ...
个中 2.13 是 scale 的版本信息,2.7.0 是 Kafka 的版本信息。
六. 支配 FlinkApache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的打算。Flink 能在所有常见集议论况中运行,并能以内存速率和任意规模进行打算。
支持高吞吐、低延迟、高性能的分布式处理框架 Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态打算。Flink被设计在所有常见的集议论况中运行,以内存实行速率和任意规模来实行打算。
本实验只做功能性测试,仅支配单机 Flink 环境。
6.1 ***并分发 Flink
[root@r23 soft]# tar vxzf flink-1.12.1-bin-scala_2.11.tgz[root@r23 soft]# mv flink-1.12.1 /opt/flink
6.2 支配 Flink 的 jre
[root@r23 soft]# tar vxzf jre1.8.0_281.tar.gz[root@r23 soft]# mv jre1.8.0_281 /opt/flink/jre
6.3 添加 Flink 须要的 lib
Flink 消费 Kafka 数据,须要 flink-sql-connector-kafka 包。
Flink 链接 MySQL/TiDB,须要 flink-connector-jdbc 包。
[root@r23 soft]# mv flink-sql-connector-kafka_2.12-1.12.0.jar /opt/flink/lib/[root@r23 soft]# mv flink-connector-jdbc_2.12-1.12.0.jar /opt/flink/lib/
6.4 修正 Flink 配置文件
## add or modify bellowing lines in /opt/flink/conf/flink-conf.yamljobmanager.rpc.address: 192.168.12.23env.java.home: /opt/flink/jre
6.5 启动 Flink
[root@r23 ~]# /opt/flink/bin/start-cluster.shStarting cluster.Starting standalonesession daemon on host r23.Starting taskexecutor daemon on host r23.
6.6 查看 Flink GUI
七. 支配 MySQL
7.1 解压 MySQL package
[root@r25 soft]# tar vxf mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz[root@r25 soft]# mv mysql-8.0.23-linux-glibc2.12-x86_64 /opt/mysql/
7.2 创建 MySQL Service 文件
[root@r25 ~]# touch /opt/mysql/support-files/mysqld.service[root@r25 support-files]# cat mysqld.service[Unit]Description=MySQL 8.0 database serverAfter=syslog.targetAfter=network.target[Service]Type=simpleUser=mysqlGroup=mysql#ExecStartPre=/usr/libexec/mysql-check-socket#ExecStartPre=/usr/libexec/mysql-prepare-db-dir %n# Note: we set --basedir to prevent probes that might trigger SELinux alarms,# per bug #547485ExecStart=/opt/mysql/bin/mysqld_safe#ExecStartPost=/opt/mysql/bin/mysql-check-upgrade#ExecStopPost=/opt/mysql/bin/mysql-wait-stop# Give a reasonable amount of time for the server to start up/shut downTimeoutSec=300# Place temp files in a secure directory, not /tmpPrivateTmp=trueRestart=on-failureRestartPreventExitStatus=1# Sets open_files_limitLimitNOFILE = 10000# Set enviroment variable MYSQLD_PARENT_PID. This is required for SQL restart command.Environment=MYSQLD_PARENT_PID=1[Install]WantedBy=multi-user.target## copy mysqld.service to /usr/lib/systemd/system/[root@r25 support-files]# cp mysqld.service /usr/lib/systemd/system/
7.3 创建 my.cnf 文件
[root@r34 opt]# cat /etc/my.cnf[mysqld]port=3306basedir=/opt/mysqldatadir=/opt/mysql/datasocket=/opt/mysql/data/mysql.socketmax_connections = 100default-storage-engine = InnoDBcharacter-set-server=utf8log-error = /opt/mysql/log/error.logslow_query_log = 1long-query-time = 30slow_query_log_file = /opt/mysql/log/show.logmin_examined_row_limit = 1000log-slow-slave-statementslog-queries-not-using-indexes#skip-grant-tables
7.4 初始化并启动 MySQL
[root@r25 ~]# /opt/mysql/bin/mysqld --initialize --user=mysql --console[root@r25 ~]# chown -R mysql:mysql /opt/mysql[root@r25 ~]# systemctl start mysqld## check mysql temp passord from /opt/mysql/log/error.log2021-02-24T02:45:47.316406Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: I?nDjijxa3>-
7.5 创建一个新的 MySQL 用户用以连接 Canal
## change mysql temp password firstlymysql> alter user 'root'@'localhost' identified by 'mysql';Query OK, 0 rows affected (0.00 sec)## create a management user 'root'@'%'mysql> create user 'root'@'%' identified by 'mysql';Query OK, 0 rows affected (0.01 sec)mysql> grant all privileges on . to 'root'@'%';Query OK, 0 rows affected (0.00 sec)## create a canal replication user 'canal'@'%'mysql> create user 'canal'@'%' identified by 'canal';Query OK, 0 rows affected (0.01 sec)mysql> grant select, replication slave, replication client on . to 'canal'@'%';Query OK, 0 rows affected (0.00 sec)mysql> flush privileges;Query OK, 0 rows affected (0.00 sec)
7.6 在 MySQL 中创建用于测试的表
mysql> show create table test.t2;+-------+----------------------------------------------------------------------------------+| Table | Create Table |+-------+----------------------------------------------------------------------------------+| t2 | CREATE TABLE `t2` ( `id` int DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8 |+-------+----------------------------------------------------------------------------------+1 row in set (0.00 sec)
八. 支配 Canal
Canal 紧张用场是基于 MySQL 数据库增量日志解析,供应增量数据订阅和消费。
早期阿里巴巴由于杭州和美国双机房支配,存在跨机房同步的业务需求,实现办法紧张是基于业务 trigger 获取增量变更。
从 2010 年开始,业务逐步考试测验数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括:
数据库镜像。数据库实时备份。索引构建和实时掩护(拆分异构索引、倒排索引等)。业务 cache 刷新。带业务逻辑的增量数据处理。当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
8.1 解压 Canal 包
[root@r26 soft]# mkdir /opt/canal && tar vxzf canal.deployer-1.1.4.tar.gz -C /opt/canal
8.2 支配 Canal 的 jre
[root@r26 soft]# tar vxzf jre1.8.0_281.tar.gz[root@r26 soft]# mv jre1.8.0_281 /opt/canal/jre## configue jre, add bellowing line in the head of /opt/canal/bin/startup.sh JAVA=/opt/canal/jre/bin/java
8.3 修正 Canal 的配置文件
修正 /opt/canal/conf/canal.properties 配置文件
## modify bellowing configurationcanal.zkServers =192.168.12.24:2181canal.serverMode = kafkacanal.destinations = example ## 须要在 /opt/canal/conf 目录下创建一个 example 文件夹,用于存放 destination 的配置canal.mq.servers = 192.168.12.22:9092
修正 /opt/canal/conf/example/instance.properties 配置文件
## modify bellowing configurationcanal.instance.master.address=192.168.12.25:3306canal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.filter.regex=.\\.. ## 过滤数据库的表canal.mq.topic=canal-kafka
九. 配置数据流向
9.1 MySQL Binlog -> Canal -> Kafka 通路
9.1.1 查看 MySQL Binlog 信息
查看 MySQL Binlog 信息,确保 Binlog 是正常的。
mysql> show master status;+---------------+----------+--------------+------------------+-------------------+| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |+---------------+----------+--------------+------------------+-------------------+| binlog.000001 | 2888 | | | |+---------------+----------+--------------+------------------+-------------------+1 row in set (0.00 sec)
9.1.2 在 Kafka 中创建一个 Topic
在 Kafka 中创建一个 Topic canal-kafka,这个Topic 的名字要与 Canal 配置文件 /opt/canal/conf/example/instance.properties 中的 canal.mq.topic=canal-kafka 对应:
[root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --create \> --zookeeper 192.168.12.24:2181 \> --config max.message.bytes=12800000 \> --config flush.messages=1 \> --replication-factor 1 \> --partitions 1 \> --topic canal-kafkaCreated topic canal-kafka.[2021-02-24 01:51:55,050] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(canal-kafka-0) (kafka.server.ReplicaFetcherManager)[2021-02-24 01:51:55,052] INFO [Log partition=canal-kafka-0, dir=/opt/kafka/logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)[2021-02-24 01:51:55,053] INFO Created log for partition canal-kafka-0 in /opt/kafka/logs/canal-kafka-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 1, message.format.version -> 2.7-IV2, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 12800000, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)[2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] No checkpointed highwatermark is found for partition canal-kafka-0 (kafka.cluster.Partition)[2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] Log loaded for partition canal-kafka-0 with initial high watermark 0 (kafka.cluster.Partition)
查看 Kafka 中所有的 Topic:
[root@r22 kafka]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.12.24:2181__consumer_offsetscanal-kafkaticdc-test
查看 Kafka 中 Topic ticdc-test 的信息:
[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.12.24:2181 --topic canal-kafkaTopic: ticdc-test PartitionCount: 1 ReplicationFactor: 1 Configs: max.message.bytes=12800000,flush.messages=1 Topic: ticdc-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
9.1.3 启动 Canal
在启动 Canal 之前,须要在 Canal 节点上查看一下端口的情形:
## check MySQL 3306 port## canal.instance.master.address=192.168.12.25:3306[root@r26 bin]# telnet 192.168.12.25 3306## check Kafka 9092 port## canal.mq.servers = 192.168.12.22:9092[root@r26 bin]# telnet 192.168.12.22 9092## check zookeeper 2181 port## canal.zkServers = 192.168.12.24:2181[root@r26 bin]# telnet 192.168.12.24 2181
启动 Canal:
[root@r26 bin]# /opt/canal/bin/startup.shcd to /opt/canal/bin for workaround relative pathLOG CONFIGURATION : /opt/canal/bin/../conf/logback.xmlcanal conf : /opt/canal/bin/../conf/canal.propertiesCLASSPATH :/opt/canal/bin/../conf:/opt/canal/bin/../lib/zookeeper-3.4.5.jar:/opt/canal/bin/../lib/zkclient-0.10.jar:/opt/canal/bin/../lib/spring-tx-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-orm-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-jdbc-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-expression-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-core-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-context-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-beans-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-aop-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/snappy-java-1.1.7.1.jar:/opt/canal/bin/../lib/snakeyaml-1.19.jar:/opt/canal/bin/../lib/slf4j-api-1.7.12.jar:/opt/canal/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_httpserver-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_hotspot-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_common-0.4.0.jar:/opt/canal/bin/../lib/simpleclient-0.4.0.jar:/opt/canal/bin/../lib/scala-reflect-2.11.12.jar:/opt/canal/bin/../lib/scala-logging_2.11-3.8.0.jar:/opt/canal/bin/../lib/scala-library-2.11.12.jar:/opt/canal/bin/../lib/rocketmq-srvutil-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-remoting-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-logging-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-common-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-client-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-acl-4.5.2.jar:/opt/canal/bin/../lib/protobuf-java-3.6.1.jar:/opt/canal/bin/../lib/oro-2.0.8.jar:/opt/canal/bin/../lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/opt/canal/bin/../lib/netty-all-4.1.6.Final.jar:/opt/canal/bin/../lib/netty-3.2.2.Final.jar:/opt/canal/bin/../lib/mysql-connector-java-5.1.47.jar:/opt/canal/bin/../lib/metrics-core-2.2.0.jar:/opt/canal/bin/../lib/lz4-java-1.4.1.jar:/opt/canal/bin/../lib/logback-core-1.1.3.jar:/opt/canal/bin/../lib/logback-classic-1.1.3.jar:/opt/canal/bin/../lib/kafka-clients-1.1.1.jar:/opt/canal/bin/../lib/kafka_2.11-1.1.1.jar:/opt/canal/bin/../lib/jsr305-3.0.2.jar:/opt/canal/bin/../lib/jopt-simple-5.0.4.jar:/opt/canal/bin/../lib/jctools-core-2.1.2.jar:/opt/canal/bin/../lib/jcl-over-slf4j-1.7.12.jar:/opt/canal/bin/../lib/javax.annotation-api-1.3.2.jar:/opt/canal/bin/../lib/jackson-databind-2.9.6.jar:/opt/canal/bin/../lib/jackson-core-2.9.6.jar:/opt/canal/bin/../lib/jackson-annotations-2.9.0.jar:/opt/canal/bin/../lib/ibatis-sqlmap-2.3.4.726.jar:/opt/canal/bin/../lib/httpcore-4.4.3.jar:/opt/canal/bin/../lib/httpclient-4.5.1.jar:/opt/canal/bin/../lib/h2-1.4.196.jar:/opt/canal/bin/../lib/guava-18.0.jar:/opt/canal/bin/../lib/fastsql-2.0.0_preview_973.jar:/opt/canal/bin/../lib/fastjson-1.2.58.jar:/opt/canal/bin/../lib/druid-1.1.9.jar:/opt/canal/bin/../lib/disruptor-3.4.2.jar:/opt/canal/bin/../lib/commons-logging-1.1.3.jar:/opt/canal/bin/../lib/commons-lang3-3.4.jar:/opt/canal/bin/../lib/commons-lang-2.6.jar:/opt/canal/bin/../lib/commons-io-2.4.jar:/opt/canal/bin/../lib/commons-compress-1.9.jar:/opt/canal/bin/../lib/commons-codec-1.9.jar:/opt/canal/bin/../lib/commons-cli-1.2.jar:/opt/canal/bin/../lib/commons-beanutils-1.8.2.jar:/opt/canal/bin/../lib/canal.store-1.1.4.jar:/opt/canal/bin/../lib/canal.sink-1.1.4.jar:/opt/canal/bin/../lib/canal.server-1.1.4.jar:/opt/canal/bin/../lib/canal.protocol-1.1.4.jar:/opt/canal/bin/../lib/canal.prometheus-1.1.4.jar:/opt/canal/bin/../lib/canal.parse.driver-1.1.4.jar:/opt/canal/bin/../lib/canal.parse.dbsync-1.1.4.jar:/opt/canal/bin/../lib/canal.parse-1.1.4.jar:/opt/canal/bin/../lib/canal.meta-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.spring-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.manager-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.core-1.1.4.jar:/opt/canal/bin/../lib/canal.filter-1.1.4.jar:/opt/canal/bin/../lib/canal.deployer-1.1.4.jar:/opt/canal/bin/../lib/canal.common-1.1.4.jar:/opt/canal/bin/../lib/aviator-2.2.1.jar:/opt/canal/bin/../lib/aopalliance-1.0.jar:cd to /opt/canal/bin for continue
9.1.4 查看 Canal 日志
查看 /opt/canal/logs/example/example.log
2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position2021-02-24 01:41:40.293 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status2021-02-24 01:41:40.542 [destination = example , address = /192.168.12.25:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000001,position=4,serverId=1,gtid=<null>,timestamp=1614134832000] cost : 244ms , the next step is binlog dump
9.1.5 查看 Kafka 中 consumer 信息
在 MySQL 中插入一条测试信息:
mysql> insert into t2 values(1);Query OK, 1 row affected (0.00 sec)
查看 consumer 的信息,已经有了刚才插入的测试数据:
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.12.22:9092 --topic canal-kafka --from-beginning{"data":null,"database":"test","es":1614151725000,"id":2,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"create database test","sqlType":null,"table":"","ts":1614151725890,"type":"QUERY"}{"data":null,"database":"test","es":1614151746000,"id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table t2(id int)","sqlType":null,"table":"t2","ts":1614151746141,"type":"CREATE"}{"data":[{"id":"1"}],"database":"test","es":1614151941000,"id":4,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"t2","ts":1614151941235,"type":"INSERT"}
9.2 Kafka -> Flink 通路
在 Flink 中创建 t2 表,connector 类型为 kafka。
## create a test table t2 in FlinkFlink SQL> create table t2(id int)> WITH (> 'connector' = 'kafka',> 'topic' = 'canal-kafka',> 'properties.bootstrap.servers' = '192.168.12.22:9092',> 'properties.group.id' = 'canal-kafka-consumer-group',> 'format' = 'canal-json',> 'scan.startup.mode' = 'latest-offset'> );Flink SQL> select from t1;
在 MySQL 中在插入一条测试数据:
mysql> insert into test.t2 values(2);Query OK, 1 row affected (0.00 sec)
从 Flink 中可以实时同步数据:
Flink SQL> select from t1; Refresh: 1 s Page: Last of 1 Updated: 02:49:27.366 id 2
9.3 Flink -> TiDB 通路
9.3.1 在 下贱的 TiDB 中创建用于测试的表
[root@r20 soft]# mysql -uroot -P14000 -hr21mysql> create table t3 (id int);Query OK, 0 rows affected (0.31 sec)
9.3.2 在 Flink 中创建测试表
Flink SQL> CREATE TABLE t3 (> id int> ) with (> 'connector' = 'jdbc',> 'url' = 'jdbc:mysql://192.168.12.21:14000/test',> 'table-name' = 't3',> 'username' = 'root',> 'password' = 'mysql'> );Flink SQL> insert into t3 values(3);[INFO] Submitting SQL update statement to the cluster...[INFO] Table update statement has been successfully submitted to the cluster:Job ID: a0827487030db177ee7e5c8575ef714e
9.3.3 不才游 TiDB 中查看插入的数据
mysql> select from test.t3;+------+| id |+------+| 3 |+------+1 row in set (0.00 sec)
本文为阿里云原创内容,未经许可不得转载。