|
@@ -0,0 +1,178 @@
|
|
|
+package com.retdata.yaoyibi.canal;
|
|
|
+
|
|
|
+import cn.hutool.core.collection.CollUtil;
|
|
|
+import cn.hutool.core.map.MapUtil;
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
|
+import com.alibaba.fastjson2.JSON;
|
|
|
+import com.alibaba.fastjson2.JSONObject;
|
|
|
+import com.alibaba.otter.canal.client.CanalConnector;
|
|
|
+import com.alibaba.otter.canal.client.CanalConnectors;
|
|
|
+import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
|
|
|
+import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
|
|
|
+import com.alibaba.otter.canal.protocol.CanalEntry;
|
|
|
+import com.alibaba.otter.canal.protocol.Message;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
+import com.retdata.yaoyibi.canal.handler.AbstractCanalDbHandler;
|
|
|
+import com.retdata.yaoyibi.domain.*;
|
|
|
+import com.retdata.yaoyibi.service.*;
|
|
|
+import com.ruoyi.common.enums.DataSourceType;
|
|
|
+import com.ruoyi.framework.datasource.DynamicDataSourceContextHolder;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.InitializingBean;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+import java.util.*;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class CannalClient implements InitializingBean {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private CanalConfig canalConfig;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private Map<String, AbstractCanalDbHandler> canalDbHandlerMap = new HashMap<>();
|
|
|
+
|
|
|
+ private Map<String, AbstractCanalDbHandler> tableNameHandlerMap = new HashMap<>();
|
|
|
+
|
|
|
+ private final static int BATCH_SIZE = 1000;
|
|
|
+
|
|
|
+ private final static long LOCAL_THREAD_SLEEP = 2000L;
|
|
|
+
|
|
|
+ private List<CanalConnector> canalConnectors= new ArrayList<>();
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void initialize() {
|
|
|
+ for (Map.Entry<String, AbstractCanalDbHandler> entry : canalDbHandlerMap.entrySet()) {
|
|
|
+ tableNameHandlerMap.put(entry.getValue().tableName(), entry.getValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @PreDestroy
|
|
|
+ public void stop(){
|
|
|
+ if(CollUtil.isNotEmpty(canalConnectors)){
|
|
|
+ for(CanalConnector canalConnector: canalConnectors){
|
|
|
+ if (canalConnector instanceof ClusterCanalConnector) {
|
|
|
+ ((ClusterCanalConnector) canalConnector).stopRunning();
|
|
|
+ } else if (canalConnector instanceof SimpleCanalConnector) {
|
|
|
+ ((SimpleCanalConnector) canalConnector).stopRunning();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void afterPropertiesSet() throws Exception {
|
|
|
+ if (canalConfig != null && MapUtil.isNotEmpty(canalConfig.getSourceConfig())) {
|
|
|
+ for (DataSourceEnum dataSourceEnum : canalConfig.getSourceConfig().keySet()) {
|
|
|
+ CanalConfig.SourceConfig sourceConfig = canalConfig.getSourceConfig().get(dataSourceEnum);
|
|
|
+ if (!sourceConfig.isEnabled()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ Thread t = new Thread(() -> {
|
|
|
+ while (true) {
|
|
|
+ // 创建链接
|
|
|
+ CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(sourceConfig.getServerHost(), sourceConfig.getServerPort()), sourceConfig.getInstance(), sourceConfig.getUsername(), sourceConfig.getPassword());
|
|
|
+ canalConnectors.add(connector);
|
|
|
+ try {
|
|
|
+ //打开连接
|
|
|
+ connector.connect();
|
|
|
+ //订阅数据库表,全部表
|
|
|
+ connector.subscribe(sourceConfig.getSubscribe());
|
|
|
+ //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
|
|
|
+ connector.rollback();
|
|
|
+ while (true) {
|
|
|
+ // 获取指定数量的数据
|
|
|
+ Message message = connector.getWithoutAck(BATCH_SIZE);
|
|
|
+ //获取批量ID
|
|
|
+ long batchId = message.getId();
|
|
|
+ //获取批量的数量
|
|
|
+ int size = message.getEntries().size();
|
|
|
+
|
|
|
+ //如果没有数据
|
|
|
+ if (batchId == -1 || size == 0) {
|
|
|
+ try {
|
|
|
+ //线程休眠2秒
|
|
|
+ Thread.sleep(LOCAL_THREAD_SLEEP);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ //如果有数据,处理数据
|
|
|
+ writeOut(dataSourceEnum, message);
|
|
|
+ }
|
|
|
+ //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
|
|
|
+ connector.ack(batchId);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ } finally {
|
|
|
+ connector.disconnect();
|
|
|
+ DynamicDataSourceContextHolder.clearDataSourceType();
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ Thread.sleep(sourceConfig.getInterval());
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ t.start();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 打印canal server解析binlog获得的实体类信息
|
|
|
+ */
|
|
|
+ private void writeOut(DataSourceEnum dataSourceEnum, Message message) {
|
|
|
+ List<CanalEntry.Entry> entrys = message.getEntries();
|
|
|
+ for (CanalEntry.Entry entry : entrys) {
|
|
|
+ if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
|
|
|
+ //开启/关闭事务的实体类型,跳过
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ //RowChange对象,包含了一行数据变化的所有特征
|
|
|
+ //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
|
|
|
+ CanalEntry.RowChange rowChage;
|
|
|
+ try {
|
|
|
+ rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
|
|
|
+ }
|
|
|
+ //获取操作类型:insert/update/delete类型
|
|
|
+ CanalEntry.EventType eventType = rowChage.getEventType();
|
|
|
+
|
|
|
+ //判断是否是DDL语句
|
|
|
+ if (rowChage.getIsDdl()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ String tableName = entry.getHeader().getTableName();
|
|
|
+
|
|
|
+ //获取RowChange对象里的每一行数据,打印出来
|
|
|
+ for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
|
|
|
+ AbstractCanalDbHandler canalDbHandler = tableNameHandlerMap.get(tableName);
|
|
|
+ if (canalDbHandler == null) {
|
|
|
+ log.warn("{}没有对应的处理器", tableName);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.CSO.name());
|
|
|
+ canalDbHandler.processRow(rowData, eventType, dataSourceEnum);
|
|
|
+ }catch (Exception e){
|
|
|
+ log.error("处理数据异常", e);
|
|
|
+ } finally {
|
|
|
+ DynamicDataSourceContextHolder.clearDataSourceType();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|