GVKun编程网logo

mssql mysql 数据同步

1

对于想了解mssqlmysql数据同步的读者,本文将提供新的信息,并且为您提供关于3、安装Logstash的plugin,mysql数据同步、5分钟搞定关系型数据库到Flink数据同步、canal数据

对于想了解mssql mysql 数据同步的读者,本文将提供新的信息,并且为您提供关于3、安装 Logstash 的 plugin,mysql 数据同步、5分钟搞定 关系型数据库 到 Flink 数据同步、canal 数据同步、Canal+Kafka 实现 MySQL 与 Redis 数据同步的有价值信息。

本文目录一览:

mssql mysql 数据同步

mssql mysql 数据同步

现在用 PHP 进行数据同步,定时执行。有没有什么高效率的同步方法。mssql 是主数据库,mysql 是从库,如果 mssql 有更新,插入,删除,如何同步效率最高

3、安装 Logstash 的 plugin,mysql 数据同步

3、安装 Logstash 的 plugin,mysql 数据同步

一、插件安装。

插件安装、卸载、更新的命令。

官网:https://www.elastic.co/guide/en/logstash/current/working-with-plugins.html#installing-local-plugins

1、查询已安装的插件:

Logstash 发行包捆绑常见的插件,以便您可以开箱即用。可以查询当前可用的插件:

bin/logstash-plugin list 将列出所有安装的插件
bin/logstash-plugin list --verbose 将列出已安装的插件与版本信息
bin/logstash-plugin list ''*namefragment*'' 将列出所有安装的包含namefragment的插件
bin/logstash-plugin list --group output 将列出特定组的所有安装的插件(输入,过滤器,编解码器,输出)

2、安装插件:

线上安装插件:

bin/logstash-plugin install logstash-output-kafka

线下安装插件:

bin/logstash-plugin install /path/to/logstash-output-kafka-1.0.0.gem

3、更新插件:

bin/logstash-plugin update 更新所有已经安装的插件
bin/logstash-plugin update logstash-output-kafka 更新插件logstash-output-kafka

4、移除插件:

bin/logstash-plugin remove logstash-output-kafka

二、实现 mysql 数据同步。

1、安装相关插件:

input 我这里使用 logstash-input-jdbc 插件,output 使用 logstash-output-jdbc 插件。因为 logstash 自带了 logstash-input-jdbc 插件。所以这里只安装 logstash-output-jdbc。

我这里使用线上安装:

bin/logstash-plugin install logstash-output-jdbc

1、下载插件。下载地址:https://github.com/theangryangel/logstash-output-jdbc。

2、线下安装,参考下面 2 个博客:

        http://blog.csdn.net/yeyuma/article/details/50240595

        https://my.oschina.net/MrYx3en/blog/508230

2、配置文件:

input {
  stdin { } 
  jdbc {
    jdbc_driver_library => "G:/MvnRepository/mysql/mysql-connector-java/5.1.41/mysql-connector-java-5.1.41.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/xnc"
    jdbc_user => "root"
    jdbc_password => "123456"
  # or jdbc_password_filepath => "/path/to/my/password_file"
  # where p.update_time >= :sql_last_start
    statement => "SELECT id, product_spec_id,zone_id,recorded_by,CAST(price_per_unit AS CHAR) price_per_unit,uom,latest,recording_date,create_time,update_time from price p"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
  }
}
output {
  #标准输出。为了测试
  stdout { codec => rubydebug }
  jdbc {
        driver_class => "com.mysql.jdbc.Driver"
        driver_jar_path => "G:/MvnRepository/mysql/mysql-connector-java/5.1.41/mysql-connector-java-5.1.41.jar"
		connection_string => "jdbc:mysql://localhost:3306/xncprice"
		username => "root"
		password => "123456"
		statement => [ "INSERT INTO price (id, product_spec_id, zone_id,recorded_by,price_per_unit,uom,latest,recording_date,create_time,update_time) VALUES(?,?,?,?,CAST(? AS decimal),?,?,?,?,?)", "id", "product_spec_id", "zone_id", "recorded_by","price_per_unit", "uom" , "latest" , "recording_date" , "create_time" , "update_time" ]
	}
  
}

3、运行。

logstash -f G:\logstash-5.5.1\config\my-logstash.conf

这里需要注意数据库类型。因为我的 price_per_unit 是 decimal 类型的,在运行之后,price_per_unit 插入的数据一直是 null,改了目标类型为 double 还是不行。解决方式有 2 个:

  1. 改源类型(input 中 price_per_unit 的数据类型)为 double。
  2. 查询的时候把源的 decimal 转换为 char 类型,插入目标的时候再转为 decimal。(我使用这个)。

4、同步 update 操作。

把 output 使用 REPLACE 语句,如果数据 id 重复时候,直接用后面的数据替换前面的数据。

output {
  #标准输出。为了测试
  stdout { codec => rubydebug }
  jdbc {
        driver_class => "com.mysql.jdbc.Driver"
        driver_jar_path => "G:/MvnRepository/mysql/mysql-connector-java/5.1.41/mysql-connector-java-5.1.41.jar"
		connection_string => "jdbc:mysql://localhost:3306/xncprice"
		username => "root"
		password => "123456"
		statement => [ "REPLACE INTO price (id, product_spec_id, zone_id,recorded_by,price_per_unit,uom,latest,recording_date,create_time,update_time) VALUES(?,?,?,?,CAST(? AS decimal),?,?,?,?,?)", "id", "product_spec_id", "zone_id", "recorded_by","price_per_unit", "uom" , "latest" , "recording_date" , "create_time" , "update_time" ]
	}

 

 

5分钟搞定 关系型数据库 到 Flink 数据同步

简述

实时数据处理领域中,使用 Flink 方式,除了从日志服务订阅埋点数据外,总离不开从关系型数据库订阅并处理相关业务数据,这时就需要监测并捕获数据库增量数据,将变更按发生的顺序写入到消息中间件以供计算(或消费)。 本文主要介绍如何通过 CloudCanal 快速构建一条高效稳定运行的 MysqL -> Kafka -> Flink 数据同步链路。

技术点

兼容多种常见消息结构

CloudCanal 目前支持 Debezium Envelope (新增)CanalAliyun DTS Avro 等多种流行消息结构,对数据下游消费比较友好。 本次对 Debezium Envelope 消息格式的支持,我们采用了一种轻量的方式做到完全兼容,充分利用 CloudCanal 增量组件,扩展数据序列化器 (EnvelopDeserialize),得到 Envelop 消息并发送到 Kafka 中。 其中 Envelop 的消息结构分为 PayloadSchema 两部分

  • Payload:存储具体数据
  • Schema:定义 Payload 的解析格式 (默认关闭)
{
  "payload":{
    "after":{
      "column_1":"3",
      ...
    },
    "before":null,
    "op":"c",
    "source":{
      "db":"kafka_test",
      "table":"new_table"
      "pos":110341861,
      "ts_ms":1659614884026,
      ...
    },
    "ts_ms":1659614884026
  },
  "schema":{
    "fields":[
      {
        "field":"after",
        "fields":[
          {
            "field":"column_1",
            "isPK":true,
            "jdbType":4,
            "type":"int(11)"
          },
          ...
        ],
        "type":"struct"
      },
      ...
    ],
    "type":"struct"
  }
}

高度可视化的CDC

CDC 工具如 FlinkCDCMaxwellDebezium ... 各有特色,CloudCanal 相对这些产品,最大的特点是高度可视化,自动化,下表针对目标端为Kafka 的 CDC 简要做了一些对比。

CloudCanal FlinkCDC Maxwell
产品化 完备 基础
同步对象配置 可视化 代码 配置文件
封装格式 多种常用格式 自定义 JSON
高可用
数据初始化(snapshot) 实例级 实例级 单表
源端支持 ORACLE,MysqL,sqlServer,MongoDB,Postgresql... ORACLE,MysqL,sqlServer,MongoDB,Postgresql... MysqL

CloudCanal 在平衡性能的基础上,提供多种关系型数据源的同步,以及反向同步;提供便捷的可视化操作、轻巧的数据源添加、轻便的参数配置; 提供多种常见的消息格式,仅仅通过鼠标点击,就可以使用其他 CDC 的消息格式的传输,让数据处理变的异常的快捷、方便。 其中经过我们在相同环境的测试下, CloudCanal 在高写入的 MysqL 场景中,处理数据的效率表现的很出色,后续我们会继续对 CloudCanal 进行优化,提升整体的性能。 综上,相比与类似的 CDC 产品来说,CloudCanal 简单轻巧并集成一体化的操作占据了很大的优势。

Flink 流式计算中不仅要订阅日志服务器的日志埋点信息,同样需要业务数据库中的信息,通过 CDC 工具订阅数据,能减少查询对业务数据库产生的压力还能以流的形式传输,方便与日志服务器中的数据进行关联处理。 实际开发中,可以将业务数据库中的信息提取过滤之后动态的放入 Hbase 中作为维度数据,方便相关联的宽表进行关联查询; 也可以对数据进行开窗、分组、聚合,同样也可以下沉到其他的 Kafka 消费者组中,实现数据的分层。

image.png

操作示例

前置条件

  • 本例使用 Envelop 消息格式,关系型数据库 MysqL 为示例,展示 MysqL 对接 Flink 的 Demo
  • 登陆 CloudCanal SaaS版,使用参见快速上手文档
  • 准备好 1 个 MysqL 实例,1 个 Kafka 实例(本例使用自己搭建的 MysqL 5.6,阿里云 Kafka 2.2)
  • 准备好 Flink 消费端程序,配置好相关信息:flink-demo 下载
  • 登录 CloudCanal 平台,添加 Kafka,MysqL

截屏2022-08-17 17.12.13.png

  • Kafka 自定义一个主题 topic_1,并创建一条 MysqL -> Kafka 链路作为增量数据来源

任务创建

  • 首先配置 FlinkDemo **程序的 **阿里云 Kafka 相关信息

截屏2022-08-17 17.09.12.png

  • 运行 FlinkDemo 程序,等待消费 MysqL 同步 Kafka 的数据(程序不要关闭)

截屏2022-08-17 17.08.50.png

  • *任务管理 *-> *任务创建 *
  • 测试链接并选择 目标 数据库,*并选择 DebeziumEnvelope 消息格式,和 topic_1 主题 *(在阿里云里提前创建)

截屏2022-08-17 17.08.18.png

  • 选择 数据同步,不勾选 全量数据初始化,其他选项默认

截屏2022-08-17 17.07.46.png

  • 选择需要迁移同步的表 table1 *和对应的 Kafka 主题 *topic_1

截屏2022-08-17 17.07.19.png

持续点击下一步,并创建出数据同步任务。

  • MysqL **生成数据,MysqL -> **Kafka(topic_1) -> Flink
  • FlinkDemo 接收到 Kafka(topic_1) 数据,下沉到 topic_2 主题,打印并输出;这里 Flink 程序可以做更多的流式计算的操作,FlinkDemo 只是演示了最基本的数据传输案例

截屏2022-08-17 17.10.05.png

常见问题

还支持哪些源端数据源呢?

目前开放 MysqL、Oracle,sqlServer,Postgres,MongoDB 到 Kafka,如果各位有需求,可以在社区反馈给我们。

支持 DDL 消息同步吗?

目前 关系型数据到 kafka 是支持 DDL 消息的同步的,可以将 关系型数据库 DDL 的变化同步到 Kafka 当中。

总结

本文简单介绍了如何使用 CloudCanal  进行 MysqL -> Kafka -> Flink 数据迁移同步。各位读者朋友,如果你觉得还不错,请点赞、评论加转发吧。

canal 数据同步

canal 数据同步

一、canal 数据同步整理了解

https://img-blog.csdnimg.cn/20191104101735947.png

主从复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave master binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log slave (canal )
- canal 解析 binary log 对象(原始为 byte )

二、主要功能模块了解

canal 同步服务主要分为 canal 服务端 canalServer(canalDeployer)和客户端 canalClient(canalAdapter)两部分

1.canalServer 主要功能:

  • 加载相关配置信息启动相应的实例 instance,链接到相应的实例数据库上,对其进行监控
  • 接入数据,发现有变化模拟 slave 协议和实例数据库进行交互,对 binlog 日志进行解析(parser)
  • 对解析后的 binlog 进行过滤,加工,分发,(sink,是 parser 和 store 服务的连接器)
  • 对最终处理后的结果进行存储(eventStore)/ 若配置的是 Kafka 等 mq,则将处理后的数据发送到 Kafka 等 mq 上

image2022-5-17_17-36-29.png

2.canalClient 主要功能:

  • 加载相关配置获取需要链接是 server 实例
  • 死循环通过 TCP/MQ 方式去 server 中定量获取 binlog 解析后的内容(Message 对象形式
  • Message 对象进行解析,解析后为 List<CommonMessage> 对象
  • 将获取后的数据转化成 ES 要求的格式去 es 中进行更新
    • 将 List<CommonMessage> 对象转化为 List<Dml > 对象
    • 批量进行更新 es 数据操作
    • 判断 insert、update、delete 操作
    • 判断是单表操作还是主从多表操作
    • 封装 es 操作语句进行 es 数据更新:

3.binlog 解析示例数据

--insert

原执行 sql:

INSERT INTO product (id, title, sub_title, price, pic) VALUES ( 6, '' 小米 6'', '' 测试全面屏手机 66666'', 6666.00, NULL );
INSERT INTO product (id, title, sub_title, price, pic) VALUES ( 7, '' 小米 7'', '' 测试全面屏手机 77777'', 7777.00, NULL );
INSERT INTO product (id, title, sub_title, price, pic) VALUES ( 8, '' 小米 8'', '' 测试全面屏手机 88888'', 7888.00, NULL );

canalServer 解析 binlog 后转为对象后的内容 List<CommonMessage>:

[CommonMessage {database=''canal'', table=''product'', pkNames=[id], isDdl=false, type=''INSERT'', es=1652784172000, ts=1652784172861, sql='''', data=[{id=6, title = 小米 6, sub_title = 测试全面屏手机 66666, price=6666.0, pic=null}], old=null}, CommonMessage {database=''canal'', table=''product'', pkNames=[id], isDdl=false, type=''INSERT'', es=1652784172000, ts=1652784172862, sql='''', data=[{id=7, title = 小米 7, sub_title = 测试全面屏手机 77777, price=7777.0, pic=null}], old=null}, CommonMessage {database=''canal'', table=''product'', pkNames=[id], isDdl=false, type=''INSERT'', es=1652784172000, ts=1652784172863, sql='''', data=[{id=8, title = 小米 8, sub_title = 测试全面屏手机 88888, price=7888.0, pic=null}], old=null}]

转为 List<Dml> 对象:

[Dml {destination=''example'', database=''canal'', table=''product'', type=''INSERT'', es=1652784172000, ts=1652784172861, sql='''', data=[{id=6, title = 小米 6, sub_title = 测试全面屏手机 66666, price=6666.0, pic=null}], old=null}, Dml {destination=''example'', database=''canal'', table=''product'', type=''INSERT'', es=1652784172000, ts=1652784172862, sql='''', data=[{id=7, title = 小米 7, sub_title = 测试全面屏手机 77777, price=7777.0, pic=null}], old=null}, Dml {destination=''example'', database=''canal'', table=''product'', type=''INSERT'', es=1652784172000, ts=1652784172863, sql='''', data=[{id=8, title = 小米 8, sub_title = 测试全面屏手机 88888, price=7888.0, pic=null}], old=null}]

单个 Dml  json 格式示例:

{
    "data":[
        {
            "id":6,
            "title":"小米 6",
            "sub_title":"测试全面屏手机 66666",
            "price":6666,
            "pic":null
        }

    ]
,
    "database":"canal",
    "destination":"example",
    "es":1652784172000,
    "groupId":"g1",
    "isDdl":false,
    "old":null,
    "pkNames":[
        "id"
    ]
,
    "sql":"",
    "table":"product",
    "ts":1652784172861,
    "type":"INSERT"
}

--update

原执行 sql:

update product set title='' 小米 666'' where id=6;
update product set title='' 小米 777'' where id=7;
update product set title='' 小米 888'' where id=8;

canalServer 解析 binlog 后转为对象后的内容 List<CommonMessage>:

[CommonMessage {database=''canal'', table=''product'', pkNames=[id], isDdl=false, type=''UPDATE'', es=1652784882000, ts=1652784882435, sql='''', data=[{id=6, title = 小米 666, sub_title = 测试全面屏手机 66666, price=6666.0, pic=null}], old=[{title = 小米 6}]}, CommonMessage {database=''canal'', table=''product'', pkNames=[id], isDdl=false, type=''UPDATE'', es=1652784882000, ts=1652784882437, sql='''', data=[{id=7, title = 小米 777, sub_title = 测试全面屏手机 77777, price=7777.0, pic=null}], old=[{title = 小米 7}]}, CommonMessage {database=''canal'', table=''product'', pkNames=[id], isDdl=false, type=''UPDATE'', es=1652784882000, ts=1652784882437, sql='''', data=[{id=8, title = 小米 888, sub_title = 测试全面屏手机 88888, price=7888.0, pic=null}], old=[{title = 小米 8}]}]

转为 List<Dml> 对象:

[Dml {destination=''example'', database=''canal'', table=''product'', type=''UPDATE'', es=1652784882000, ts=1652784882435, sql='''', data=[{id=6, title = 小米 666, sub_title = 测试全面屏手机 66666, price=6666.0, pic=null}], old=[{title = 小米 6}]}, Dml {destination=''example'', database=''canal'', table=''product'', type=''UPDATE'', es=1652784882000, ts=1652784882437, sql='''', data=[{id=7, title = 小米 777, sub_title = 测试全面屏手机 77777, price=7777.0, pic=null}], old=[{title = 小米 7}]}, Dml {destination=''example'', database=''canal'', table=''product'', type=''UPDATE'', es=1652784882000, ts=1652784882437, sql='''', data=[{id=8, title = 小米 888, sub_title = 测试全面屏手机 88888, price=7888.0, pic=null}], old=[{title = 小米 8}]}]

单个 Dml  json 格式示例:

{
    "data":[
        {
            "id":6,
            "title":"小米 666",
            "sub_title":"测试全面屏手机 66666",
            "price":6666,
            "pic":null
        }

    ]
,
    "database":"canal",
    "destination":"example",
    "es":1652784882000,
    "groupId":"g1",
    "isDdl":false,
    "old":[
        {
            "title":"小米 6"
        }

    ]
,
    "pkNames":[
        "id"
    ]
,
    "sql":"",
    "table":"product",
    "ts":1652784882435,
    "type":"UPDATE"
}

--delete

原执行 sql:

DELETE FROM product where id in(6,7);
DELETE FROM product where id=8;

canalServer 解析 binlog 后转为对象后的内容 List<CommonMessage>:

[CommonMessage {database=''canal'', table=''product'', pkNames=[id], isDdl=false, type=''DELETE'', es=1652785876000, ts=1652785876116, sql='''', data=[{id=6, title = 小米 6, sub_title = 测试全面屏手机 66666, price=6666.0, pic=null}, {id=7, title = 小米 7, sub_title = 测试全面屏手机 77777, price=7777.0, pic=null}], old=null}, CommonMessage {database=''canal'', table=''product'', pkNames=[id], isDdl=false, type=''DELETE'', es=1652785876000, ts=1652785876116, sql='''', data=[{id=8, title = 小米 8, sub_title = 测试全面屏手机 88888, price=7888.0, pic=null}], old=null}]

转为 List<Dml> 对象:

[Dml {destination=''example'', database=''canal'', table=''product'', type=''DELETE'', es=1652785876000, ts=1652785876116, sql='''', data=[{id=6, title = 小米 6, sub_title = 测试全面屏手机 66666, price=6666.0, pic=null}, {id=7, title = 小米 7, sub_title = 测试全面屏手机 77777, price=7777.0, pic=null}], old=null}, Dml {destination=''example'', database=''canal'', table=''product'', type=''DELETE'', es=1652785876000, ts=1652785876116, sql='''', data=[{id=8, title = 小米 8, sub_title = 测试全面屏手机 88888, price=7888.0, pic=null}], old=null}]

单个 Dml  json 格式示例:

{
    "data":[
        {
            "id":6,
            "title":"小米 6",
            "sub_title":"测试全面屏手机 66666",
            "price":6666,
            "pic":null
        }
,
        {
            "id":7,
            "title":"小米 7",
            "sub_title":"测试全面屏手机 77777",
            "price":7777,
            "pic":null
        }

    ]
,
    "database":"canal",
    "destination":"example",
    "es":1652785876000,
    "groupId":"g1",
    "isDdl":false,
    "old":null,
    "pkNames":[
        "id"
    ]
,
    "sql":"",
    "table":"product",
    "ts":1652785876116,
    "type":"DELETE"
}

Canal+Kafka 实现 MySQL 与 Redis 数据同步

Canal+Kafka 实现 MySQL 与 Redis 数据同步

思维导图

前言

在很多业务情况下,我们都会在系统中加入 redis 缓存做查询优化。

如果数据库数据发生更新,这时候就需要在业务代码中写一段同步更新 redis 的代码。

这种数据同步的代码跟业务代码糅合在一起会不太优雅,能不能把这些数据同步的代码抽出来形成一个独立的模块呢,答案是可以的。

架构图

canal 是一个伪装成 slave 订阅 mysql 的 binlog,实现数据同步的中间件。上一篇文章《canal 入门》

我已经介绍了最简单的使用方法,也就是 tcp 模式。

实际上 canal 是支持直接发送到 MQ 的,目前最新版是支持主流的三种 MQ:Kafka、RocketMQ、RabbitMQ。而 canal 的 RabbitMQ 模式目前是有一定的 bug,所以一般使用 Kafka 或者 RocketMQ。

本文使用 Kafka,实现 Redis 与 MySQL 的数据同步。架构图如下:

通过架构图,我们很清晰就知道要用到的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。

下面演示 Kafka 的搭建,MySQL 搭建大家应该都会,ZooKeeper、Redis 这些网上也有很多资料参考。

搭建 Kafka

首先在官网下载安装包:

解压,打开 /config/server.properties 配置文件,修改日志目录:

log.dirs=./logs

首先启动 ZooKeeper,我用的是 3.6.1 版本:

接着再启动 Kafka,在 Kafka 的 bin 目录下打开 cmd,输入命令:

kafka-server-start.bat ../../config/server.properties

我们可以看到 ZooKeeper 上注册了 Kafka 相关的配置信息:

然后需要创建一个队列,用于接收 canal 传送过来的数据,使用命令:

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic canaltopic

创建的队列名是 canaltopic

配置 Cannal Server

canal 官网下载相关安装包:

找到 canal.deployer-1.1.4/conf 目录下的 canal.properties 配置文件:

# tcp, kafka, RocketMQ 这里选择kafka模式
canal.serverMode = kafka
# 解析器的线程数,打开此配置,不打开则会出现阻塞或者不进行解析的情况
canal.instance.parser.parallelThreadSize = 16
# 配置MQ的服务地址,这里配置的是kafka对应的地址和端口
canal.mq.servers = 127.0.0.1:9092
# 配置instance,在conf目录下要有example同名的目录,可以配置多个
canal.destinations = example

然后配置 instance,找到 /conf/example/instance.properties 配置文件:

## mysql serverId , v1.0.26+ will autoGen(自动生成,不需配置)
# canal.instance.mysql.slaveId=0

# position info
canal.instance.master.address=127.0.0.1:3306
# 在Mysql执行 SHOW MASTER STATUS;查看当前数据库的binlog
canal.instance.master.journal.name=mysql-bin.000006
canal.instance.master.position=4596
# 账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@****
canal.instance.connectionCharset = UTF-8
#MQ队列名称
canal.mq.topic=canaltopic
#单队列模式的分区下标
canal.mq.partition=0

配置完成后,就可以启动 canal 了。

测试

这时可以打开 kafka 的消费者窗口,测试一下 kafka 是否收到消息。

使用命令进行监听消费:

kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canaltopic

有个小坑。我这里使用的是 win10 系统的 cmd 命令行,win10 系统默认的编码是 GBK,而 Canal Server 是 UTF-8 的编码,所以控制台会出现乱码:

怎么解决呢?

在 cmd 命令行执行前切换到 UTF-8 编码即可,使用命令行:chcp 65001

然后再执行打开 kafka 消费端的命令,就不乱码了:

接下来就是启动 Redis,把数据同步到 Redis 就完事了。

封装 Redis 客户端

环境搭建完成后,我们可以写代码了。

首先引入 Kafka 和 Redis 的 maven 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

在 application.yml 文件增加以下配置:

spring:  
  redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    password: 123456

封装一个操作 Redis 的工具类:

@Component
public class RedisClient {

    /**
     * 获取redis模版
     */

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 设置redis的key-value
     */

    public void setString(String key, String value) {
        setString(key, valuenull);
    }

    /**
     * 设置redis的key-value,带过期时间
     */

    public void setString(String key, String value, Long timeOut) {
        stringRedisTemplate.opsForValue().set(key, value);
        if (timeOut != null) {
            stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);
        }
    }

    /**
     * 获取redis中key对应的值
     */

    public String getString(String key) {
        return stringRedisTemplate.opsForValue().get(key);
    }

    /**
     * 删除redis中key对应的值
     */

    public Boolean deleteKey(String key) {
        return stringRedisTemplate.delete(key);
    }
}

创建 MQ 消费者进行同步

在 application.yml 配置文件加上 kafka 的配置信息:

spring:
  kafka:
   Kafka服务地址
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      # 指定一个默认的组名
      group-id: consumer-group1
      #序列化反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288

根据上面 Kafka 消费命令那里,我们知道了 json 数据的结构,可以创建一个 CanalBean 对象进行接收:

public class CanalBean {
    //数据
    private List<TbCommodityInfo> data;
    //数据库名称
    private String database;
    private long es;
    //递增,从1开始
    private int id;
    //是否是DDL语句
    private boolean isDdl;
    //表结构的字段类型
    private MysqlType mysqlType;
    //UPDATE语句,旧数据
    private String old;
    //主键名称
    private List<String> pkNames;
    //sql语句
    private String sql;
    private SqlType sqlType;
    //表名
    private String table;
    private long ts;
    //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
    private String type;
    //getter、setter方法
}
public class MysqlType {
    private String id;
    private String commodity_name;
    private String commodity_price;
    private String number;
    private String description;
    //getter、setter方法
}
public class SqlType {
    private int id;
    private int commodity_name;
    private int commodity_price;
    private int number;
    private int description;
}

最后就可以创建一个消费者 CanalConsumer 进行消费:

@Component
public class CanalConsumer {
 //日志记录
    private static Logger log = LoggerFactory.getLogger(CanalConsumer.class);
 //redis操作工具类
    @Resource
    private RedisClient redisClient;
 //监听的队列名称为:canaltopic
    @KafkaListener(topics = "canaltopic")
    public void receive(ConsumerRecord<?, ?> consumer) {
        String value = (String) consumer.value();
        log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);
        //转换为javaBean
        CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);
        //获取是否是DDL语句
        boolean isDdl = canalBean.getIsDdl();
        //获取类型
        String type = canalBean.getType();
        //不是DDL语句
        if (!isDdl) {
            List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();
            //过期时间
            long TIME_OUT = 600L;
            if ("INSERT".equals(type)) {
                //新增语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    //新增到redis中,过期时间是10分钟
                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
                }
            } else if ("UPDATE".equals(type)) {
                //更新语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    //更新到redis中,过期时间是10分钟
                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
                }
            } else {
                //删除语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    //从redis中删除
                    redisClient.deleteKey(id);
                }
            }
        }
    }
}

测试 MySQL 与 Redis 同步

mysql 对应的表结构如下:

CREATE TABLE `tb_commodity_info` (
  `id` varchar(32NOT NULL,
  `commodity_name` varchar(512DEFAULT NULL COMMENT ''商品名称'',
  `commodity_price` varchar(36DEFAULT ''0'' COMMENT ''商品价格'',
  `number` int(10DEFAULT ''0'' COMMENT ''商品数量'',
  `description` varchar(2048DEFAULT '''' COMMENT ''商品描述'',
  PRIMARY KEY (`id`)
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=''商品信息表'';

首先在 MySQL 创建表。然后启动项目,接着新增一条数据:

INSERT INTO `canaldb`.`tb_commodity_info` (`id``commodity_name``commodity_price``number``description`VALUES (''3e71a81fd80711eaaed600163e046cc3''''叉烧包''''3.99''''3''''又大又香的叉烧包,老人小孩都喜欢'');

tb_commodity_info 表查到新增的数据:

Redis 也查到了对应的数据,证明同步成功!

如果更新呢?试一下 Update 语句:

UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`=''青菜包'',`description`=''很便宜的青菜包呀,不买也开看看了喂'' WHERE `id`=''3e71a81fd80711eaaed600163e046cc3'';

没有问题!

总结

那么你会说,canal 就没有什么缺点吗?

肯定是有的:

  1. canal 只能同步增量数据。
  2. 不是实时同步,是准实时同步。
  3. 存在一些 bug,不过社区活跃度较高,对于提出的 bug 能及时修复。
  4. MQ 顺序性问题。我这里把官网的回答列出来,大家参考一下。

尽管有一些缺点,毕竟没有一样技术或者产品是完美的,最重要是合适。

我们公司在同步 MySQL 数据到 Elastic Search 也是采用 Canal+RocketMQ 的方式。

参考资料:canal 官网

絮叨

上面所有例子的代码都上传 Github 了:

https://github.com/yehongzhi/mall

如果你觉得这篇文章对你有用,点个赞吧

你的点赞是我创作的最大动力

拒绝做一条咸鱼,我是一个努力让大家记住的程序员。我们下期再见!!!


本文分享自微信公众号 - java 技术爱好者(yehongzhi_java)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与 “OSC 源创计划”,欢迎正在阅读的你也加入,一起分享。

今天的关于mssql mysql 数据同步的分享已经结束,谢谢您的关注,如果想了解更多关于3、安装 Logstash 的 plugin,mysql 数据同步、5分钟搞定 关系型数据库 到 Flink 数据同步、canal 数据同步、Canal+Kafka 实现 MySQL 与 Redis 数据同步的相关知识,请在本站进行查询。

本文标签:

上一篇一、微软同步框架Sync Framework之SQL SERVER Compact to MSSQL

下一篇Centos7 下安装以及使用 mssql