切换语言为:繁体

整合Canal,ES,MySQL,rabbitMQ与Redis实现数据同步

  • 爱糖宝
  • 2024-06-02
  • 2123
  • 0
  • 0

本文是个人学习后的总结,如有不足与错误之处,欢迎各位大佬指正。

版本信息

  • Canal.adapter: v1.1.6

  • Canal.depolyer: v1.1.6

  • ES: v7.12.1

  • rabbitMQ: v3.8.5

  • Redis: v6.2.6

配置过程

Canal.depolyer

  1. 修改conf->example文件夹下的instance.properties中监听数据库的配置

# position info
canal.instance.master.address= #此处替换为自己的MySQL地址和端口

# username/password
canal.instance.dbUsername=canal #此处替换为自己在MySQL中创建的账号
canal.instance.dbPassword=canal #替换为对应的密码

# mq config
canal.mq.topic=canal-routing-key #此处替换为自己在RabbitMQ中创建的对应Exchange的Routing Key

其余部分不用更改

  1. 启动Canal.deployer

Canal.adapter

  1. 先将conf目录下的bootstrap.yml中的内容全部注释掉

  2. 修改conf目录下的application.yml

canal.conf:
  mode: rabbitMQ #tcp kafka rocketMQ rabbitMQ 此处改为rabbitMQ
  ......
  consumerProperties:
    # rabbitMQ consumer
    rabbitmq.host: #此处填入自己的rabbitMQ地址,不加端口,不加端口,不加端口!
    rabbitmq.virtual.host: /
    rabbitmq.username: #账号
    rabbitmq.password: #密码
    rabbitmq.resource.ownerId: #此处不用填
  ......
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/your_db_table_name # 这里似乎是自动生成的,注意检查
      username: canal #MySQL中创建的账号
      password: canal #对应的密码
  canalAdapters:
  - instance: canal-example # rabbitMQ中队列的名字
    groups:
    - groupId: g1 # 这里应该是和es7文件里的保持一致就行
      outerAdapters:
#      - name: logger
      - name: rdb # 替换为自己创建的adapter适配器,我的在rdb文件夹下
        key: canal-routing-key # rabbitMQ的Routing Key
      ......
      - name: es7
        hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode 这里改为自己的es地址(我的es和canal在同一台虚拟机中),记得加http://,记得加http://,记得加http://。
        properties:
          mode: rest # or rest 模式改为rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch

  1. 进入conf->es7目录中创建文件 创建自己要监听的表名.yml,例如:example.yml

  2. 进入example.yml中写入代码:

dataSourceKey: defaultDS #对应canal.adapter/conf/application.yml 文件中的srcDataSources下的defaultDS
destination: canal-example # rabbitMQ中队列的名字
groupId: g1
esMapping:
  _index: example # es中的索引名 
  _id: _id
  sql: "select u.name as _id,u.id from user u" # 此处是对应查询MySQL中相应的表的语句
  commitBatch: 1

  1. 创建adapter适配器

我的在rdb文件夹下,叫mytest_user.yml

dataSourceKey: defaultDS #对应上面application.yml的srcDataSource
destination: canal-example # rabbitMQ队列的名字
groupId: g1 # 和上面保持一致
outerAdapterKey: canal-routing-key # rabbitMQ的Routing Key
concurrent: true
dbMapping:
  database: mytest
  table: user
  targetTable: mytest2.user
  targetPk:
    id: id
  mapAll: true # 此处我改为了true,因为我查询的时候是所有数据都查出来,可以按需更改
#  targetColumns:
#    id:
#    name:
#    role_id:
#    c_time:
#    test1:
  etlCondition: "where c_time>={}"
  commitBatch: 3000 # 批量提交的大小


## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
#  mirrorDb: true
#  database: mytest

  1. 在kibana中创建ES的索引,根据自己的需求编写

PUT /article
{
  "mappings": {
    "properties": {
      "id" : {
        "type": "integer"
      },
      "user_id":{
        "type": "integer"
      }  
    }
  }
}

  1. 启动canal.adapter

rabbitMQ

  1. 创建exchange,我选的Type是direct

  2. 创建queue,然后根据Routing Key绑定到对应的exchange

springboot代码

rabbitMQ连接池 connection

public class RabbitmqConnection {

    private Connection connection;

    public RabbitmqConnection(String host, int port, String userName, String password, String virtualhost) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualhost);
        try {
            connection = connectionFactory.newConnection();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取链接
     *
     * @return
     */
    public Connection getConnection() {
        return connection;
    }

    /**
     * 关闭链接
     *
     */
    public void close() {
        try {
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

connectionPool

public class RabbitmqConnectionPool {

    private static BlockingQueue<RabbitmqConnection> pool;

    public static void initRabbitmqConnectionPool(String host, int port, String userName, String password,
                                             String virtualhost,
                                           Integer poolSize) {
        pool = new LinkedBlockingQueue<>(poolSize);
        for (int i = 0; i < poolSize; i++) {
            pool.add(new RabbitmqConnection(host, port, userName, password, virtualhost));
        }
    }

    public static RabbitmqConnection getConnection() throws InterruptedException {
        return pool.take();
    }


    public static void returnConnection(RabbitmqConnection connection) {
        pool.add(connection);
    }

    public static void close() {
        pool.forEach(RabbitmqConnection::close);
    }
}

创建消费者消费消息

@Slf4j
@Service
public class CanalMQServiceImpl implements CanalMQService {

    @Override
    public void consumeCanalMsg(String exchange,
                                String queueName,
                                String routingKey) {

        try {
            //创建连接
            RabbitmqConnection rabbitmqConnection = RabbitmqConnectionPool.getConnection();
            Connection connection = rabbitmqConnection.getConnection();
            //创建消息信道
            final Channel channel = connection.createChannel();
            //消息队列
            channel.queueDeclare(queueName, true, false, false, null);
            //绑定队列到交换机
            channel.queueBind(queueName, exchange, routingKey);

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    log.info("Consumer canal msg: {}", message);

                    // 获取Rabbitmq消息,并保存到Redis
                    CanalMessage<Map<String, String>> canalMessage = JsonUtil.toObj(message, CanalMessage.class);
                    String cacheKey = "canal_rabbitmq_"
                            + canalMessage.getId();

                    if("INSERT".equalsIgnoreCase(canalMessage.getType())){
                        RedisClient.setStr(cacheKey, canalMessage.getData().get(0).get("id"));
                    }else if("UPDATE".equalsIgnoreCase(canalMessage.getType()) || "DELETE".equalsIgnoreCase(canalMessage.getType())){
                        RedisClient.del(cacheKey);
                    }

                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 取消自动ack
            channel.basicConsume(queueName, false, consumer);
            // channel.close();
            RabbitmqConnectionPool.returnConnection(rabbitmqConnection);
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }
    }


    @Override
    public void processConsumerCanalMsg() {
        log.info("Begin to processConsumerCanalMsg.");

        Integer stepTotal = 1;
        Integer step = 0;

        while (true) {
            step++;
            try {
                log.info("processConsumerCanalMsg cycle.");
                consumeCanalMsg(CanalConstant.CANALEXCHANGE, CanalConstant.CANALQUEUE,
                        CanalConstant.CANALROUTING); # 即rabbitMQ中的交换机,队列,routing key
                if (step.equals(stepTotal)) {
                    Thread.sleep(10000);
                    step = 0;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

创建线程持续获取消息:

@Configuration
@ConditionalOnProperty(value = "rabbitmq.switchFlag")
@EnableConfigurationProperties(RabbitmqProperties.class)
public class RabbitMqAutoConfig implements ApplicationRunner {

    @Autowired
    private RabbitmqProperties rabbitmqProperties;

    @Resource
    private CanalMQService canalMQService;


    @Override
    public void run(ApplicationArguments args) throws Exception {
        String host = rabbitmqProperties.getHost();
        Integer port = rabbitmqProperties.getPort();
        String userName = rabbitmqProperties.getUsername();
        String password = rabbitmqProperties.getPassport();
        String virtualhost = rabbitmqProperties.getVirtualhost();
        Integer poolSize = rabbitmqProperties.getPoolSize();
        RabbitmqConnectionPool.initRabbitmqConnectionPool(host, port, userName, password, virtualhost, poolSize);
        AsyncUtil.execute(() -> canalMQService.processConsumerCanalMsg());
    }
}

创建canal上传到rabbitMQ中的数据的对应实体(可以先运行,从rabbitMQ中获取一条消息,然后让AI写)

@NoArgsConstructor
@Data
public class CanalMessage<T> {
    private List<T> data;
    private String database;
    private String destination;
    private Long es;
    private String groupId;
    private Boolean isDdl;
    private Map<String,String> mysqlType;
    private List<T> old;
    private List<String> pkNames;
    private String sql;
    private Map<String,Integer> sqlType;
    private String table;
    private Long ts;
    private String type;
    private Integer id;
}

此时运行程序应该就能成功运行。

Consumer canal msg: {"data":[{"id":"18","host":"192.168.XX.XXX","cnt":"121","date":"2024-06-01","create_time":"2024-06-01 11:28:05","update_time":"2024-06-01 21:38:11"}],"database":"DB","es":1717249091000,"id":177,"isDdl":false,"mysqlType":{"id":"bigint unsigned","host":"varchar(32)","cnt":"int unsigned","date":"date","create_time":"timestamp","update_time":"timestamp"},"old":[{"cnt":"120","update_time":"2024-06-01 21:38:08"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"host":12,"cnt":4,"date":91,"create_time":93,"update_time":93},"table":"request_count","ts":1717249122683,"type":"UPDATE"}

控制台应该输出这种消息,即表明同步成功。 上述给的代码只是简略实现了功能,实际上可以根据获取的消息的type,或者根据表名进行不同的处理,欢迎大家交流处理方法,结束~~~

0条评论

您的电子邮件等信息不会被公开,以下所有项均必填

OK! You can skip this field.