Canal
原创大约 3 分钟
和Sqoop一样,Canal也可以将数据从关系型数据库(RDBMS)导入到Hadoop这种大数据系统中,只不过Canal是阿里巴巴开源的,而且更多侧重于对MySQL数据库的Binlog(日志)文件进行解析,提供增量的数据订阅和消费,而且它一直在更新,这一点比Sqoop要好很多。
准备环境
使用已有的环境:Docker部署MySQL。
授权canal
账号具有MySQL slave
的权限(如果已有账户可直接grant
)。
> USE mysql;
> CREATE USER canal IDENTIFIED WITH mysql_native_password BY 'canal';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
> FLUSH PRIVILEGES;
如果只是测试环境可以直接使用MySQL
数据库中的root
账号,不需要弄得这么麻烦。
对接Kafka
下载并修改canal
相关配置。
> mkdir /home/work/canal
> cd /home/work/canal
> wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
> tar zxf canal.deployer-1.1.4.tar.gz
> cp conf/example/instance.properties conf/example/instance.properties.bak
## 修改服务配置(把不需要的MQ都注释掉)
> vi conf/canal.properties
# 可选:tcp, kafka, rocketMQ
canal.serverMode = kafka
# 目标实例配置(指向conf/example/instance.properties)
canal.destinations = example
# 修改Kafka服务地址
kafka.bootstrap.servers = 172.16.185.176:9092
## 修改实例配置
> vi conf/example/instance.properties
# mysql serverId
canal.instance.mysql.slaveId = 9528
# position info,需要改成自己的数据库信息
canal.instance.master.address = 172.16.185.176:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password,需要改成自己的数据库信息(测试环境用root就好)
canal.instance.dbUsername = root
canal.instance.dbPassword = 123456
canal.instance.connectionCharset = UTF-8
# mq config
# test为topic名称,canal为监听的数据库名称,后面的..*,.*\\..*正则表达式意思是监听所有数据库中的所有表
canal.mq.dynamicTopic=test.canal\\..*,.*\\..*
# table regex
canal.instance.filter.regex = .\*\\\\..\*
管理Canal。
# 启动服务
> sh ./bin/startup.sh
# 查看Server日志
> tail -f ./logs/canal/canal.log
# 查看instance日志
> tail -f .logs/example/example.log
# 关闭服务
> sh ./bin/stop.sh
启动Canal
服务,然后再启动一个Kafka
消费者监听,就能在其中看到输出了。
拉取Canal数据
如果因为版本问题导致Canal
输出到Kafka
不成功,那么也可以通过编码的方式从Canal
拉取数据,然后再写入到Kafka
中去。
先引入依赖。
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
同时修改Canal
服务属性文件canal.properties
中的配置。
> vi conf/canal.properties
# 不使用MQ时,这里要改为tcp
canal.serverMode = tcp
代码实现。
package itechthink.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
/**
* 从Canal中拉取数据
*
*/
public class CanalClient {
public static void main(String[] args) throws InvalidProtocolBufferException {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("172.16.185.176", 11111),
"example", null, null);
while (true) {
// 连接数据库
connector.connect();
// 订阅指定的数据库
connector.subscribe("canal.*");
Message message = connector.get(100);
List<CanalEntry.Entry> entries = message.getEntries();
if (!entries.isEmpty()) {
for (CanalEntry.Entry entry : entries) {
// 获取表名
String tableName = entry.getHeader().getTableName();
// 拿到数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
// 根据类型进行判断
switch (rowChange.getEventType()) {
case INSERT:
System.out.println("INSERT");
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> columnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : columnsList) {
System.out.println(column.getName() + ":" + column.getValue());
}
}
// 把数据发送到想要输出的地方
// TODO...
break;
case UPDATE:
System.out.println("UPDATE");
break;
case DELETE:
System.out.println("DELETE");
break;
default:
break;
}
}
}
System.out.println("没有数据");
}
}
}
感谢支持
更多内容,请移步《超级个体》。