本文是个人学习后的总结,如有不足与错误之处,欢迎各位大佬指正。
版本信息
Canal.adapter: v1.1.6
Canal.depolyer: v1.1.6
ES: v7.12.1
rabbitMQ: v3.8.5
Redis: v6.2.6
配置过程
Canal.depolyer
修改conf->example文件夹下的instance.properties中监听数据库的配置
# position info canal.instance.master.address= #此处替换为自己的MySQL地址和端口 # username/password canal.instance.dbUsername=canal #此处替换为自己在MySQL中创建的账号 canal.instance.dbPassword=canal #替换为对应的密码 # mq config canal.mq.topic=canal-routing-key #此处替换为自己在RabbitMQ中创建的对应Exchange的Routing Key
其余部分不用更改
启动Canal.deployer
Canal.adapter
先将conf目录下的bootstrap.yml中的内容全部注释掉
修改conf目录下的application.yml
canal.conf: mode: rabbitMQ #tcp kafka rocketMQ rabbitMQ 此处改为rabbitMQ ...... consumerProperties: # rabbitMQ consumer rabbitmq.host: #此处填入自己的rabbitMQ地址,不加端口,不加端口,不加端口! rabbitmq.virtual.host: / rabbitmq.username: #账号 rabbitmq.password: #密码 rabbitmq.resource.ownerId: #此处不用填 ...... srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/your_db_table_name # 这里似乎是自动生成的,注意检查 username: canal #MySQL中创建的账号 password: canal #对应的密码 canalAdapters: - instance: canal-example # rabbitMQ中队列的名字 groups: - groupId: g1 # 这里应该是和es7文件里的保持一致就行 outerAdapters: # - name: logger - name: rdb # 替换为自己创建的adapter适配器,我的在rdb文件夹下 key: canal-routing-key # rabbitMQ的Routing Key ...... - name: es7 hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode 这里改为自己的es地址(我的es和canal在同一台虚拟机中),记得加http://,记得加http://,记得加http://。 properties: mode: rest # or rest 模式改为rest # security.auth: test:123456 # only used for rest mode cluster.name: elasticsearch
进入conf->es7目录中创建文件 创建自己要监听的表名.yml,例如:example.yml
进入example.yml中写入代码:
dataSourceKey: defaultDS #对应canal.adapter/conf/application.yml 文件中的srcDataSources下的defaultDS destination: canal-example # rabbitMQ中队列的名字 groupId: g1 esMapping: _index: example # es中的索引名 _id: _id sql: "select u.name as _id,u.id from user u" # 此处是对应查询MySQL中相应的表的语句 commitBatch: 1
创建adapter适配器
我的在rdb文件夹下,叫mytest_user.yml
dataSourceKey: defaultDS #对应上面application.yml的srcDataSource destination: canal-example # rabbitMQ队列的名字 groupId: g1 # 和上面保持一致 outerAdapterKey: canal-routing-key # rabbitMQ的Routing Key concurrent: true dbMapping: database: mytest table: user targetTable: mytest2.user targetPk: id: id mapAll: true # 此处我改为了true,因为我查询的时候是所有数据都查出来,可以按需更改 # targetColumns: # id: # name: # role_id: # c_time: # test1: etlCondition: "where c_time>={}" commitBatch: 3000 # 批量提交的大小 ## Mirror schema synchronize config #dataSourceKey: defaultDS #destination: example #groupId: g1 #outerAdapterKey: mysql1 #concurrent: true #dbMapping: # mirrorDb: true # database: mytest
在kibana中创建ES的索引,根据自己的需求编写
PUT /article { "mappings": { "properties": { "id" : { "type": "integer" }, "user_id":{ "type": "integer" } } } }
启动canal.adapter
rabbitMQ
创建exchange,我选的Type是direct
创建queue,然后根据Routing Key绑定到对应的exchange
springboot代码
rabbitMQ连接池 connection
public class RabbitmqConnection { private Connection connection; public RabbitmqConnection(String host, int port, String userName, String password, String virtualhost) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(userName); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualhost); try { connection = connectionFactory.newConnection(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } /** * 获取链接 * * @return */ public Connection getConnection() { return connection; } /** * 关闭链接 * */ public void close() { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } }
connectionPool
public class RabbitmqConnectionPool { private static BlockingQueue<RabbitmqConnection> pool; public static void initRabbitmqConnectionPool(String host, int port, String userName, String password, String virtualhost, Integer poolSize) { pool = new LinkedBlockingQueue<>(poolSize); for (int i = 0; i < poolSize; i++) { pool.add(new RabbitmqConnection(host, port, userName, password, virtualhost)); } } public static RabbitmqConnection getConnection() throws InterruptedException { return pool.take(); } public static void returnConnection(RabbitmqConnection connection) { pool.add(connection); } public static void close() { pool.forEach(RabbitmqConnection::close); } }
创建消费者消费消息
@Slf4j @Service public class CanalMQServiceImpl implements CanalMQService { @Override public void consumeCanalMsg(String exchange, String queueName, String routingKey) { try { //创建连接 RabbitmqConnection rabbitmqConnection = RabbitmqConnectionPool.getConnection(); Connection connection = rabbitmqConnection.getConnection(); //创建消息信道 final Channel channel = connection.createChannel(); //消息队列 channel.queueDeclare(queueName, true, false, false, null); //绑定队列到交换机 channel.queueBind(queueName, exchange, routingKey); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); log.info("Consumer canal msg: {}", message); // 获取Rabbitmq消息,并保存到Redis CanalMessage<Map<String, String>> canalMessage = JsonUtil.toObj(message, CanalMessage.class); String cacheKey = "canal_rabbitmq_" + canalMessage.getId(); if("INSERT".equalsIgnoreCase(canalMessage.getType())){ RedisClient.setStr(cacheKey, canalMessage.getData().get(0).get("id")); }else if("UPDATE".equalsIgnoreCase(canalMessage.getType()) || "DELETE".equalsIgnoreCase(canalMessage.getType())){ RedisClient.del(cacheKey); } channel.basicAck(envelope.getDeliveryTag(), false); } }; // 取消自动ack channel.basicConsume(queueName, false, consumer); // channel.close(); RabbitmqConnectionPool.returnConnection(rabbitmqConnection); } catch (InterruptedException | IOException e) { e.printStackTrace(); } } @Override public void processConsumerCanalMsg() { log.info("Begin to processConsumerCanalMsg."); Integer stepTotal = 1; Integer step = 0; while (true) { step++; try { log.info("processConsumerCanalMsg cycle."); consumeCanalMsg(CanalConstant.CANALEXCHANGE, CanalConstant.CANALQUEUE, CanalConstant.CANALROUTING); # 即rabbitMQ中的交换机,队列,routing key if (step.equals(stepTotal)) { Thread.sleep(10000); step = 0; } } catch (Exception e) { e.printStackTrace(); } } } }
创建线程持续获取消息:
@Configuration @ConditionalOnProperty(value = "rabbitmq.switchFlag") @EnableConfigurationProperties(RabbitmqProperties.class) public class RabbitMqAutoConfig implements ApplicationRunner { @Autowired private RabbitmqProperties rabbitmqProperties; @Resource private CanalMQService canalMQService; @Override public void run(ApplicationArguments args) throws Exception { String host = rabbitmqProperties.getHost(); Integer port = rabbitmqProperties.getPort(); String userName = rabbitmqProperties.getUsername(); String password = rabbitmqProperties.getPassport(); String virtualhost = rabbitmqProperties.getVirtualhost(); Integer poolSize = rabbitmqProperties.getPoolSize(); RabbitmqConnectionPool.initRabbitmqConnectionPool(host, port, userName, password, virtualhost, poolSize); AsyncUtil.execute(() -> canalMQService.processConsumerCanalMsg()); } }
创建canal上传到rabbitMQ中的数据的对应实体(可以先运行,从rabbitMQ中获取一条消息,然后让AI写)
@NoArgsConstructor @Data public class CanalMessage<T> { private List<T> data; private String database; private String destination; private Long es; private String groupId; private Boolean isDdl; private Map<String,String> mysqlType; private List<T> old; private List<String> pkNames; private String sql; private Map<String,Integer> sqlType; private String table; private Long ts; private String type; private Integer id; }
此时运行程序应该就能成功运行。
Consumer canal msg: {"data":[{"id":"18","host":"192.168.XX.XXX","cnt":"121","date":"2024-06-01","create_time":"2024-06-01 11:28:05","update_time":"2024-06-01 21:38:11"}],"database":"DB","es":1717249091000,"id":177,"isDdl":false,"mysqlType":{"id":"bigint unsigned","host":"varchar(32)","cnt":"int unsigned","date":"date","create_time":"timestamp","update_time":"timestamp"},"old":[{"cnt":"120","update_time":"2024-06-01 21:38:08"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"host":12,"cnt":4,"date":91,"create_time":93,"update_time":93},"table":"request_count","ts":1717249122683,"type":"UPDATE"}
控制台应该输出这种消息,即表明同步成功。 上述给的代码只是简略实现了功能,实际上可以根据获取的消息的type,或者根据表名进行不同的处理,欢迎大家交流处理方法,结束~~~