优雅数据同步--canal实现mysql同步demo

Posted by zhangtao on Friday, December 23, 2022

当需要两张表数据同步的时候,我们会想到几种方案? 最简单的一种方式就是触发器的方式。例如A同步到B,可以通过下面的sql来添加触发器

create trigger tri_trade_update 
after UPDATE 
on `A`
for each row
begin 
update `B` 
set 
company_id = new.`company_id`,
supplier_id =new.`supplier_id`
WHERE id=old.`id`;   
end;

但是这种方式有一定的弊端和局限性,首先就是只局限同库,并且会增大数据库开销,以及无法实现一些自定义的逻辑。

canal是阿里推出的一个开源的中间件,它的原理类似于mysql主从的原理,它把自己伪装成mysql的从库,这样就可以从主库中获取binLog来复制数据

接下来我们自己搭建一个canal同步mysql的一个demo。我们都通过docker来安装mysql和canal,这样比较方便

1、首先在docker搜索mysql的官方镜像。我们选择最新的版本latest然后拉取下来 2、运行这个镜像 3、使用命令把mysql容器内部存储数据文件拷贝到外部目录存储。

docker cp mysql:/etc/mysql/my.cnf /Users/admin/WORK/docker/mysql/config
    docker cp mysql:/var/lib/mysql /Users/admin/WORK/docker/mysql/data

4、 删除运行的mysql容器 5、 重新启动mysql容器 -v 挂载容器文件到外部目录,这样我们就可以持久化mysql的数据不会随着容器关闭而消失。 -p 将容器内mysql的端口映射到外部端口上 -e 设置环境变量

docker run -d \
    --name mysql \
    -p 3306:3306 \
    -v /Users/admin/WORK/docker/mysql/config/my.cnf:/etc/mysql/my.cnf \
    -v /Users/admin/WORK/docker/mysql/data/mysql:/var/lib/mysql \
    -e MYSQL_ROOT_PASSWORD=123456 \
    mysql:latest

1、首先在docker搜索canal-server的官方镜像。选择最新的版本latest然后拉取下来 2、运行这个镜像 3、使用命令把mysql容器内部存储数据文件拷贝到外部目录存储。

docker cp canal:/home/admin/canal-server/conf/example/instance.properties /Users/admin/WORK/docker/cancal/conf

4、 删除运行的canal容器 5、 重新启动canal容器 -v 挂载容器文件到外部目录,这样我们就可以持久化mysql的数据不会随着容器关闭而消失。 -p 将容器内mysql的端口映射到外部端口上 -e 设置环境变量

docker run -d \
    --name canal \
    -p 11111:11111 \
    -v /Users/admin/WORK/docker/cancal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
    canal/canal-server:latest

修改本地my.cnf文件。将canal同步需要的配置添加上

log-bin=mysql-bin #开启binlog
binlog-format=ROW #row格式
server_id=1 #主从标识

创建canal账号,并赋予同步权限

# 创建账号
CREATE USER canal IDENTIFIED BY 'canal'; 
# 授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 刷新并应用
FLUSH PRIVILEGES;
# 建库
create database canal;
# 查看设置binlog是否生效
show variables like 'log_bin';

修改instance.properties文件

修改canal.instance.master.address配置,ip改成mysql容器的内部ip。注意容器跟容器访问不能使用127.0.0.1

重启mysql容器,重启canal容器

进入canal容器中,观察日志more canal-server/logs/example/example.log 查看是否报错

package logistics.canal;


import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * Canal测试
 *
 * @author admin
 * @date 2022/12/23
 */
public class CanalTest {
   

    public static void main(String[] args) {
   
        String ip = "127.0.0.1";
        String destination = "example";
        //创建连接对象
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(ip, 11111), destination, "", ""
        );

        //进行连接
        canalConnector.connect();
        //进行订阅
        canalConnector.subscribe();

        int batchSize = 5 * 1024;
        //使用死循环不断的获取canal信息
        while (true) {
   
            //获取Message对象
            Message message = canalConnector.getWithoutAck(batchSize);
            long id = message.getId();
            int size = message.getEntries().size();

            System.out.println("当前监控到的binLog消息数量是:" + size);

            //判断是否有数据
            if (id == -1 || size == 0) {
   
                //如果没有数据,等待1秒
                try {
   
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
   
                    e.printStackTrace();
                }
            } else {
   
                //如果有数据,进行数据解析
                List<Entry> entries = message.getEntries();

                //遍历获取到的Entry集合
                for (Entry entry : entries) {
   
                    System.out.println("----------------------------------------");
                    System.out.println("当前的二进制日志的条目(entry)类型是:" + entry.getEntryType());

                    //如果属于原始数据ROWDATA,进行打印内容
                    if (entry.getEntryType() == EntryType.ROWDATA) {
   
                        try {
   
                            //获取存储的内容
                            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

                            //打印事件的类型,增删改查哪种 eventType
                            System.out.println("事件类型是:" + rowChange.getEventType());

                            //打印改变的内容(增量数据)
                            for (RowData rowData : rowChange.getRowDatasList()) {
   
                                System.out.println("改变前的数据:" + rowData.getBeforeColumnsList());
                                System.out.println("改变后的数据:" + rowData.getAfterColumnsList());
                            }

                        } catch (Exception e) {
   
                            e.printStackTrace();
                        }
                    }
                }
                //消息确认已经处理了
                canalConnector.ack(id);
            }
        }
    }
}

开启客户端,mysql执行sql语句。我们可以看到客户端日志变化,说明成功了 img

1、mysql启动时报错。Different lower_case_table_names settings for server (‘1‘) and data dictionary (‘0‘)

解决方法: my.cnf里面添加lower_case_table_names = 1。同时删除data里面的内容,去掉data目录的挂载,重新启动容器再复制一份到data中。这样再次启动就不会报错了

2、canal日志报错MySQL8.0 caching_sha2_password Auth failed,无法连接mysql

解决方法: mysql执行下面语句

ALTER USER 'canal'@'%' IDENTIFIED BY 'canal' PASSWORD EXPIRE NEVER;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
FLUSH PRIVILEGES;