切換語言為:簡體

整合Canal,ES,MySQL,rabbitMQ與Redis實現資料同步

  • 爱糖宝
  • 2024-06-02
  • 2124
  • 0
  • 0

本文是個人學習後的總結,如有不足與錯誤之處,歡迎各位大佬指正。

版本資訊

  • Canal.adapter: v1.1.6

  • Canal.depolyer: v1.1.6

  • ES: v7.12.1

  • rabbitMQ: v3.8.5

  • Redis: v6.2.6

配置過程

Canal.depolyer

  1. 修改conf->example資料夾下的instance.properties中監聽資料庫的配置

# position info
canal.instance.master.address= #此處替換為自己的MySQL地址和埠

# username/password
canal.instance.dbUsername=canal #此處替換為自己在MySQL中建立的賬號
canal.instance.dbPassword=canal #替換為對應的密碼

# mq config
canal.mq.topic=canal-routing-key #此處替換為自己在RabbitMQ中建立的對應Exchange的Routing Key

其餘部分不用更改

  1. 啟動Canal.deployer

Canal.adapter

  1. 先將conf目錄下的bootstrap.yml中的內容全部註釋掉

  2. 修改conf目錄下的application.yml

canal.conf:
  mode: rabbitMQ #tcp kafka rocketMQ rabbitMQ 此處改為rabbitMQ
  ......
  consumerProperties:
    # rabbitMQ consumer
    rabbitmq.host: #此處填入自己的rabbitMQ地址,不加埠,不加埠,不加埠!
    rabbitmq.virtual.host: /
    rabbitmq.username: #賬號
    rabbitmq.password: #密碼
    rabbitmq.resource.ownerId: #此處不用填
  ......
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/your_db_table_name # 這裏似乎是自動生成的,注意檢查
      username: canal #MySQL中建立的賬號
      password: canal #對應的密碼
  canalAdapters:
  - instance: canal-example # rabbitMQ中佇列的名字
    groups:
    - groupId: g1 # 這裏應該是和es7檔案裡的保持一致就行
      outerAdapters:
#      - name: logger
      - name: rdb # 替換為自己建立的adapter介面卡,我的在rdb資料夾下
        key: canal-routing-key # rabbitMQ的Routing Key
      ......
      - name: es7
        hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode 這裏改為自己的es地址(我的es和canal在同一臺虛擬機器中),記得加http://,記得加http://,記得加http://。
        properties:
          mode: rest # or rest 模式改為rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch

  1. 進入conf->es7目錄中建立檔案 建立自己要監聽的表名.yml,例如:example.yml

  2. 進入example.yml中寫入程式碼:

dataSourceKey: defaultDS #對應canal.adapter/conf/application.yml 檔案中的srcDataSources下的defaultDS
destination: canal-example # rabbitMQ中佇列的名字
groupId: g1
esMapping:
  _index: example # es中的索引名 
  _id: _id
  sql: "select u.name as _id,u.id from user u" # 此處是對應查詢MySQL中相應的表的語句
  commitBatch: 1

  1. 建立adapter介面卡

我的在rdb資料夾下,叫mytest_user.yml

dataSourceKey: defaultDS #對應上面application.yml的srcDataSource
destination: canal-example # rabbitMQ佇列的名字
groupId: g1 # 和上面保持一致
outerAdapterKey: canal-routing-key # rabbitMQ的Routing Key
concurrent: true
dbMapping:
  database: mytest
  table: user
  targetTable: mytest2.user
  targetPk:
    id: id
  mapAll: true # 此處我改爲了true,因為我查詢的時候是所有資料都查出來,可以按需更改
#  targetColumns:
#    id:
#    name:
#    role_id:
#    c_time:
#    test1:
  etlCondition: "where c_time>={}"
  commitBatch: 3000 # 批次提交的大小


## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
#  mirrorDb: true
#  database: mytest

  1. 在kibana中建立ES的索引,根據自己的需求編寫

PUT /article
{
  "mappings": {
    "properties": {
      "id" : {
        "type": "integer"
      },
      "user_id":{
        "type": "integer"
      }  
    }
  }
}

  1. 啟動canal.adapter

rabbitMQ

  1. 建立exchange,我選的Type是direct

  2. 建立queue,然後根據Routing Key繫結到對應的exchange

springboot程式碼

rabbitMQ連線池 connection

public class RabbitmqConnection {

    private Connection connection;

    public RabbitmqConnection(String host, int port, String userName, String password, String virtualhost) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualhost);
        try {
            connection = connectionFactory.newConnection();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

    /**
     * 獲取連結
     *
     * @return
     */
    public Connection getConnection() {
        return connection;
    }

    /**
     * 關閉連結
     *
     */
    public void close() {
        try {
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

connectionPool

public class RabbitmqConnectionPool {

    private static BlockingQueue<RabbitmqConnection> pool;

    public static void initRabbitmqConnectionPool(String host, int port, String userName, String password,
                                             String virtualhost,
                                           Integer poolSize) {
        pool = new LinkedBlockingQueue<>(poolSize);
        for (int i = 0; i < poolSize; i++) {
            pool.add(new RabbitmqConnection(host, port, userName, password, virtualhost));
        }
    }

    public static RabbitmqConnection getConnection() throws InterruptedException {
        return pool.take();
    }


    public static void returnConnection(RabbitmqConnection connection) {
        pool.add(connection);
    }

    public static void close() {
        pool.forEach(RabbitmqConnection::close);
    }
}

建立消費者消費訊息

@Slf4j
@Service
public class CanalMQServiceImpl implements CanalMQService {

    @Override
    public void consumeCanalMsg(String exchange,
                                String queueName,
                                String routingKey) {

        try {
            //建立連線
            RabbitmqConnection rabbitmqConnection = RabbitmqConnectionPool.getConnection();
            Connection connection = rabbitmqConnection.getConnection();
            //建立訊息通道
            final Channel channel = connection.createChannel();
            //訊息佇列
            channel.queueDeclare(queueName, true, false, false, null);
            //繫結佇列到交換機
            channel.queueBind(queueName, exchange, routingKey);

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    log.info("Consumer canal msg: {}", message);

                    // 獲取Rabbitmq訊息,並儲存到Redis
                    CanalMessage<Map<String, String>> canalMessage = JsonUtil.toObj(message, CanalMessage.class);
                    String cacheKey = "canal_rabbitmq_"
                            + canalMessage.getId();

                    if("INSERT".equalsIgnoreCase(canalMessage.getType())){
                        RedisClient.setStr(cacheKey, canalMessage.getData().get(0).get("id"));
                    }else if("UPDATE".equalsIgnoreCase(canalMessage.getType()) || "DELETE".equalsIgnoreCase(canalMessage.getType())){
                        RedisClient.del(cacheKey);
                    }

                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 取消自動ack
            channel.basicConsume(queueName, false, consumer);
            // channel.close();
            RabbitmqConnectionPool.returnConnection(rabbitmqConnection);
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }
    }


    @Override
    public void processConsumerCanalMsg() {
        log.info("Begin to processConsumerCanalMsg.");

        Integer stepTotal = 1;
        Integer step = 0;

        while (true) {
            step++;
            try {
                log.info("processConsumerCanalMsg cycle.");
                consumeCanalMsg(CanalConstant.CANALEXCHANGE, CanalConstant.CANALQUEUE,
                        CanalConstant.CANALROUTING); # 即rabbitMQ中的交換機,佇列,routing key
                if (step.equals(stepTotal)) {
                    Thread.sleep(10000);
                    step = 0;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

建立執行緒持續獲取訊息:

@Configuration
@ConditionalOnProperty(value = "rabbitmq.switchFlag")
@EnableConfigurationProperties(RabbitmqProperties.class)
public class RabbitMqAutoConfig implements ApplicationRunner {

    @Autowired
    private RabbitmqProperties rabbitmqProperties;

    @Resource
    private CanalMQService canalMQService;


    @Override
    public void run(ApplicationArguments args) throws Exception {
        String host = rabbitmqProperties.getHost();
        Integer port = rabbitmqProperties.getPort();
        String userName = rabbitmqProperties.getUsername();
        String password = rabbitmqProperties.getPassport();
        String virtualhost = rabbitmqProperties.getVirtualhost();
        Integer poolSize = rabbitmqProperties.getPoolSize();
        RabbitmqConnectionPool.initRabbitmqConnectionPool(host, port, userName, password, virtualhost, poolSize);
        AsyncUtil.execute(() -> canalMQService.processConsumerCanalMsg());
    }
}

建立canal上傳到rabbitMQ中的資料的對應實體(可以先執行,從rabbitMQ中獲取一條訊息,然後讓AI寫)

@NoArgsConstructor
@Data
public class CanalMessage<T> {
    private List<T> data;
    private String database;
    private String destination;
    private Long es;
    private String groupId;
    private Boolean isDdl;
    private Map<String,String> mysqlType;
    private List<T> old;
    private List<String> pkNames;
    private String sql;
    private Map<String,Integer> sqlType;
    private String table;
    private Long ts;
    private String type;
    private Integer id;
}

此時執行程式應該就能成功執行。

Consumer canal msg: {"data":[{"id":"18","host":"192.168.XX.XXX","cnt":"121","date":"2024-06-01","create_time":"2024-06-01 11:28:05","update_time":"2024-06-01 21:38:11"}],"database":"DB","es":1717249091000,"id":177,"isDdl":false,"mysqlType":{"id":"bigint unsigned","host":"varchar(32)","cnt":"int unsigned","date":"date","create_time":"timestamp","update_time":"timestamp"},"old":[{"cnt":"120","update_time":"2024-06-01 21:38:08"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"host":12,"cnt":4,"date":91,"create_time":93,"update_time":93},"table":"request_count","ts":1717249122683,"type":"UPDATE"}

控制檯應該輸出這種訊息,即表明同步成功。 上述給的程式碼只是簡略實現了功能,實際上可以根據獲取的訊息的type,或者根據表名進行不同的處理,歡迎大家交流處理方法,結束~~~

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.