本文是個人學習後的總結,如有不足與錯誤之處,歡迎各位大佬指正。
版本資訊
Canal.adapter: v1.1.6
Canal.depolyer: v1.1.6
ES: v7.12.1
rabbitMQ: v3.8.5
Redis: v6.2.6
配置過程
Canal.depolyer
修改conf->example資料夾下的instance.properties中監聽資料庫的配置
# position info canal.instance.master.address= #此處替換為自己的MySQL地址和埠 # username/password canal.instance.dbUsername=canal #此處替換為自己在MySQL中建立的賬號 canal.instance.dbPassword=canal #替換為對應的密碼 # mq config canal.mq.topic=canal-routing-key #此處替換為自己在RabbitMQ中建立的對應Exchange的Routing Key
其餘部分不用更改
啟動Canal.deployer
Canal.adapter
先將conf目錄下的bootstrap.yml中的內容全部註釋掉
修改conf目錄下的application.yml
canal.conf: mode: rabbitMQ #tcp kafka rocketMQ rabbitMQ 此處改為rabbitMQ ...... consumerProperties: # rabbitMQ consumer rabbitmq.host: #此處填入自己的rabbitMQ地址,不加埠,不加埠,不加埠! rabbitmq.virtual.host: / rabbitmq.username: #賬號 rabbitmq.password: #密碼 rabbitmq.resource.ownerId: #此處不用填 ...... srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/your_db_table_name # 這裏似乎是自動生成的,注意檢查 username: canal #MySQL中建立的賬號 password: canal #對應的密碼 canalAdapters: - instance: canal-example # rabbitMQ中佇列的名字 groups: - groupId: g1 # 這裏應該是和es7檔案裡的保持一致就行 outerAdapters: # - name: logger - name: rdb # 替換為自己建立的adapter介面卡,我的在rdb資料夾下 key: canal-routing-key # rabbitMQ的Routing Key ...... - name: es7 hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode 這裏改為自己的es地址(我的es和canal在同一臺虛擬機器中),記得加http://,記得加http://,記得加http://。 properties: mode: rest # or rest 模式改為rest # security.auth: test:123456 # only used for rest mode cluster.name: elasticsearch
進入conf->es7目錄中建立檔案 建立自己要監聽的表名.yml,例如:example.yml
進入example.yml中寫入程式碼:
dataSourceKey: defaultDS #對應canal.adapter/conf/application.yml 檔案中的srcDataSources下的defaultDS destination: canal-example # rabbitMQ中佇列的名字 groupId: g1 esMapping: _index: example # es中的索引名 _id: _id sql: "select u.name as _id,u.id from user u" # 此處是對應查詢MySQL中相應的表的語句 commitBatch: 1
建立adapter介面卡
我的在rdb資料夾下,叫mytest_user.yml
dataSourceKey: defaultDS #對應上面application.yml的srcDataSource destination: canal-example # rabbitMQ佇列的名字 groupId: g1 # 和上面保持一致 outerAdapterKey: canal-routing-key # rabbitMQ的Routing Key concurrent: true dbMapping: database: mytest table: user targetTable: mytest2.user targetPk: id: id mapAll: true # 此處我改爲了true,因為我查詢的時候是所有資料都查出來,可以按需更改 # targetColumns: # id: # name: # role_id: # c_time: # test1: etlCondition: "where c_time>={}" commitBatch: 3000 # 批次提交的大小 ## Mirror schema synchronize config #dataSourceKey: defaultDS #destination: example #groupId: g1 #outerAdapterKey: mysql1 #concurrent: true #dbMapping: # mirrorDb: true # database: mytest
在kibana中建立ES的索引,根據自己的需求編寫
PUT /article { "mappings": { "properties": { "id" : { "type": "integer" }, "user_id":{ "type": "integer" } } } }
啟動canal.adapter
rabbitMQ
建立exchange,我選的Type是direct
建立queue,然後根據Routing Key繫結到對應的exchange
springboot程式碼
rabbitMQ連線池 connection
public class RabbitmqConnection { private Connection connection; public RabbitmqConnection(String host, int port, String userName, String password, String virtualhost) { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(userName); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualhost); try { connection = connectionFactory.newConnection(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } /** * 獲取連結 * * @return */ public Connection getConnection() { return connection; } /** * 關閉連結 * */ public void close() { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } }
connectionPool
public class RabbitmqConnectionPool { private static BlockingQueue<RabbitmqConnection> pool; public static void initRabbitmqConnectionPool(String host, int port, String userName, String password, String virtualhost, Integer poolSize) { pool = new LinkedBlockingQueue<>(poolSize); for (int i = 0; i < poolSize; i++) { pool.add(new RabbitmqConnection(host, port, userName, password, virtualhost)); } } public static RabbitmqConnection getConnection() throws InterruptedException { return pool.take(); } public static void returnConnection(RabbitmqConnection connection) { pool.add(connection); } public static void close() { pool.forEach(RabbitmqConnection::close); } }
建立消費者消費訊息
@Slf4j @Service public class CanalMQServiceImpl implements CanalMQService { @Override public void consumeCanalMsg(String exchange, String queueName, String routingKey) { try { //建立連線 RabbitmqConnection rabbitmqConnection = RabbitmqConnectionPool.getConnection(); Connection connection = rabbitmqConnection.getConnection(); //建立訊息通道 final Channel channel = connection.createChannel(); //訊息佇列 channel.queueDeclare(queueName, true, false, false, null); //繫結佇列到交換機 channel.queueBind(queueName, exchange, routingKey); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); log.info("Consumer canal msg: {}", message); // 獲取Rabbitmq訊息,並儲存到Redis CanalMessage<Map<String, String>> canalMessage = JsonUtil.toObj(message, CanalMessage.class); String cacheKey = "canal_rabbitmq_" + canalMessage.getId(); if("INSERT".equalsIgnoreCase(canalMessage.getType())){ RedisClient.setStr(cacheKey, canalMessage.getData().get(0).get("id")); }else if("UPDATE".equalsIgnoreCase(canalMessage.getType()) || "DELETE".equalsIgnoreCase(canalMessage.getType())){ RedisClient.del(cacheKey); } channel.basicAck(envelope.getDeliveryTag(), false); } }; // 取消自動ack channel.basicConsume(queueName, false, consumer); // channel.close(); RabbitmqConnectionPool.returnConnection(rabbitmqConnection); } catch (InterruptedException | IOException e) { e.printStackTrace(); } } @Override public void processConsumerCanalMsg() { log.info("Begin to processConsumerCanalMsg."); Integer stepTotal = 1; Integer step = 0; while (true) { step++; try { log.info("processConsumerCanalMsg cycle."); consumeCanalMsg(CanalConstant.CANALEXCHANGE, CanalConstant.CANALQUEUE, CanalConstant.CANALROUTING); # 即rabbitMQ中的交換機,佇列,routing key if (step.equals(stepTotal)) { Thread.sleep(10000); step = 0; } } catch (Exception e) { e.printStackTrace(); } } } }
建立執行緒持續獲取訊息:
@Configuration @ConditionalOnProperty(value = "rabbitmq.switchFlag") @EnableConfigurationProperties(RabbitmqProperties.class) public class RabbitMqAutoConfig implements ApplicationRunner { @Autowired private RabbitmqProperties rabbitmqProperties; @Resource private CanalMQService canalMQService; @Override public void run(ApplicationArguments args) throws Exception { String host = rabbitmqProperties.getHost(); Integer port = rabbitmqProperties.getPort(); String userName = rabbitmqProperties.getUsername(); String password = rabbitmqProperties.getPassport(); String virtualhost = rabbitmqProperties.getVirtualhost(); Integer poolSize = rabbitmqProperties.getPoolSize(); RabbitmqConnectionPool.initRabbitmqConnectionPool(host, port, userName, password, virtualhost, poolSize); AsyncUtil.execute(() -> canalMQService.processConsumerCanalMsg()); } }
建立canal上傳到rabbitMQ中的資料的對應實體(可以先執行,從rabbitMQ中獲取一條訊息,然後讓AI寫)
@NoArgsConstructor @Data public class CanalMessage<T> { private List<T> data; private String database; private String destination; private Long es; private String groupId; private Boolean isDdl; private Map<String,String> mysqlType; private List<T> old; private List<String> pkNames; private String sql; private Map<String,Integer> sqlType; private String table; private Long ts; private String type; private Integer id; }
此時執行程式應該就能成功執行。
Consumer canal msg: {"data":[{"id":"18","host":"192.168.XX.XXX","cnt":"121","date":"2024-06-01","create_time":"2024-06-01 11:28:05","update_time":"2024-06-01 21:38:11"}],"database":"DB","es":1717249091000,"id":177,"isDdl":false,"mysqlType":{"id":"bigint unsigned","host":"varchar(32)","cnt":"int unsigned","date":"date","create_time":"timestamp","update_time":"timestamp"},"old":[{"cnt":"120","update_time":"2024-06-01 21:38:08"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"host":12,"cnt":4,"date":91,"create_time":93,"update_time":93},"table":"request_count","ts":1717249122683,"type":"UPDATE"}
控制檯應該輸出這種訊息,即表明同步成功。 上述給的程式碼只是簡略實現了功能,實際上可以根據獲取的訊息的type,或者根據表名進行不同的處理,歡迎大家交流處理方法,結束~~~