|
@@ -25,10 +25,15 @@ import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Executors;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
@Slf4j
|
|
@Slf4j
|
|
@Component
|
|
@Component
|
|
-public class CannalClient implements InitializingBean {
|
|
|
|
|
|
+public class CannalClient {
|
|
|
|
+
|
|
|
|
+ static {
|
|
|
|
+ log.info("CanalClient类已加载");
|
|
|
|
+ }
|
|
|
|
|
|
@Autowired
|
|
@Autowired
|
|
private CanalConfig canalConfig;
|
|
private CanalConfig canalConfig;
|
|
@@ -41,39 +46,86 @@ public class CannalClient implements InitializingBean {
|
|
|
|
|
|
private Map<String, AbstractCanalDbHandler> tableNameHandlerMap = new HashMap<>();
|
|
private Map<String, AbstractCanalDbHandler> tableNameHandlerMap = new HashMap<>();
|
|
|
|
|
|
- private final static int BATCH_SIZE = 1000;
|
|
|
|
|
|
+ private final static int BATCH_SIZE = 500;
|
|
|
|
|
|
private final static long LOCAL_THREAD_SLEEP = 2000L;
|
|
private final static long LOCAL_THREAD_SLEEP = 2000L;
|
|
|
|
|
|
- private Map<DataSourceEnum, CanalConnector> canalConnectors= new HashMap<>(2);
|
|
|
|
|
|
+ private Map<DataSourceEnum, CanalConnector> canalConnectors= new HashMap<>(DataSourceEnum.values().length);
|
|
|
|
+ private final Map<DataSourceEnum, Thread> canalThreads = new HashMap<>(DataSourceEnum.values().length);
|
|
|
|
|
|
@PostConstruct
|
|
@PostConstruct
|
|
public void initialize() {
|
|
public void initialize() {
|
|
|
|
+ log.info("CanalClient开始初始化,注册的处理器数量: {}", canalDbHandlerMap.size());
|
|
for (Map.Entry<String, AbstractCanalDbHandler> entry : canalDbHandlerMap.entrySet()) {
|
|
for (Map.Entry<String, AbstractCanalDbHandler> entry : canalDbHandlerMap.entrySet()) {
|
|
tableNameHandlerMap.put(entry.getValue().tableName(), entry.getValue());
|
|
tableNameHandlerMap.put(entry.getValue().tableName(), entry.getValue());
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ start();
|
|
|
|
+ }catch (Exception e){
|
|
|
|
+ log.error("CanalClient初始化异常", e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ log.info("CanalClient初始化完成");
|
|
}
|
|
}
|
|
|
|
|
|
@PreDestroy
|
|
@PreDestroy
|
|
public void stop(){
|
|
public void stop(){
|
|
- if(CollUtil.isNotEmpty(canalConnectors)){
|
|
|
|
- for(Map.Entry<DataSourceEnum, CanalConnector> entry: canalConnectors.entrySet()){
|
|
|
|
- CanalConnector canalConnector = entry.getValue();
|
|
|
|
- if (canalConnector instanceof ClusterCanalConnector) {
|
|
|
|
- ((ClusterCanalConnector) canalConnector).stopRunning();
|
|
|
|
- } else if (canalConnector instanceof SimpleCanalConnector) {
|
|
|
|
- ((SimpleCanalConnector) canalConnector).stopRunning();
|
|
|
|
|
|
+ log.info("开始停止CanalClient...");
|
|
|
|
+
|
|
|
|
+ // 1. 设置停止标志
|
|
|
|
+ running = false;
|
|
|
|
+
|
|
|
|
+ // 2. 等待当前正在处理的数据完成
|
|
|
|
+ log.info("等待当前数据处理完成...");
|
|
|
|
+ try {
|
|
|
|
+ // 给更多时间让当前数据处理完成
|
|
|
|
+ Thread.sleep(10000); // 10秒
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ log.warn("等待数据处理完成时被中断", e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 3. 关闭Canal连接
|
|
|
|
+ log.info("开始关闭Canal连接...");
|
|
|
|
+ if (CollUtil.isNotEmpty(canalConnectors)) {
|
|
|
|
+ for (Map.Entry<DataSourceEnum, CanalConnector> entry : canalConnectors.entrySet()) {
|
|
|
|
+ try {
|
|
|
|
+ CanalConnector canalConnector = entry.getValue();
|
|
|
|
+ if (canalConnector instanceof ClusterCanalConnector) {
|
|
|
|
+ ((ClusterCanalConnector) canalConnector).stopRunning();
|
|
|
|
+ } else if (canalConnector instanceof SimpleCanalConnector) {
|
|
|
|
+ ((SimpleCanalConnector) canalConnector).stopRunning();
|
|
|
|
+ }
|
|
|
|
+ log.info("Canal连接已关闭: {}", entry.getKey().name());
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ log.error("关闭Canal连接时发生异常: {}", entry.getKey().name(), e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- running = false;
|
|
|
|
- executor.shutdownNow();
|
|
|
|
- log.warn("CannalClient 已销毁");
|
|
|
|
|
|
+ // 4. 优雅关闭线程池
|
|
|
|
+ log.info("开始关闭线程池...");
|
|
|
|
+ try {
|
|
|
|
+ executor.shutdown();
|
|
|
|
+ // 等待线程池中的任务完成,最多等待30秒
|
|
|
|
+ if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
|
|
|
|
+ log.warn("线程池未能在30秒内完成,强制关闭");
|
|
|
|
+ executor.shutdownNow();
|
|
|
|
+ // 再等待5秒
|
|
|
|
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
|
|
|
|
+ log.error("线程池无法关闭");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ log.error("关闭线程池时发生异常", e);
|
|
|
|
+ executor.shutdownNow();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ log.warn("CanalClient 已销毁");
|
|
}
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
|
- public void afterPropertiesSet() throws Exception {
|
|
|
|
|
|
+ public void start() throws Exception {
|
|
|
|
+ log.info("CanalClient开始启动,配置信息: {}", canalConfig);
|
|
if (canalConfig != null && MapUtil.isNotEmpty(canalConfig.getSourceConfig())) {
|
|
if (canalConfig != null && MapUtil.isNotEmpty(canalConfig.getSourceConfig())) {
|
|
for (DataSourceEnum dataSourceEnum : canalConfig.getSourceConfig().keySet()) {
|
|
for (DataSourceEnum dataSourceEnum : canalConfig.getSourceConfig().keySet()) {
|
|
CanalConfig.SourceConfig sourceConfig = canalConfig.getSourceConfig().get(dataSourceEnum);
|
|
CanalConfig.SourceConfig sourceConfig = canalConfig.getSourceConfig().get(dataSourceEnum);
|
|
@@ -82,66 +134,108 @@ public class CannalClient implements InitializingBean {
|
|
}
|
|
}
|
|
|
|
|
|
executor.submit(()-> {
|
|
executor.submit(()-> {
|
|
- while (running){
|
|
|
|
- try {
|
|
|
|
- while (true) {
|
|
|
|
- // 创建链接
|
|
|
|
- CanalConnector connector = null;
|
|
|
|
- if(CollUtil.isNotEmpty(canalConnectors)
|
|
|
|
- && canalConnectors.containsKey(dataSourceEnum)
|
|
|
|
- ){
|
|
|
|
- connector = canalConnectors.get(dataSourceEnum);
|
|
|
|
- }else {
|
|
|
|
- connector = CanalConnectors.newSingleConnector(new InetSocketAddress(sourceConfig.getServerHost(), sourceConfig.getServerPort()), sourceConfig.getInstance(), sourceConfig.getUsername(), sourceConfig.getPassword());
|
|
|
|
-
|
|
|
|
- canalConnectors.put(dataSourceEnum, connector);
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- //打开连接
|
|
|
|
- connector.connect();
|
|
|
|
- //订阅数据库表,全部表
|
|
|
|
- connector.subscribe(sourceConfig.getSubscribe());
|
|
|
|
- //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
|
|
|
|
- connector.rollback();
|
|
|
|
- while (true) {
|
|
|
|
- // 获取指定数量的数据
|
|
|
|
- Message message = connector.getWithoutAck(BATCH_SIZE);
|
|
|
|
- //获取批量ID
|
|
|
|
- long batchId = message.getId();
|
|
|
|
- //获取批量的数量
|
|
|
|
- int size = message.getEntries().size();
|
|
|
|
-
|
|
|
|
- //如果没有数据
|
|
|
|
- if (batchId == -1 || size == 0) {
|
|
|
|
- try {
|
|
|
|
- //线程休眠2秒
|
|
|
|
- Thread.sleep(LOCAL_THREAD_SLEEP);
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- e.printStackTrace();
|
|
|
|
|
|
+ Thread currentThread = Thread.currentThread();
|
|
|
|
+ canalThreads.put(dataSourceEnum, currentThread);
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ log.info("Canal线程启动,数据源: {}, 线程ID: {}, 线程名: {}",
|
|
|
|
+ dataSourceEnum.name(),
|
|
|
|
+ currentThread.getId(),
|
|
|
|
+ currentThread.getName());
|
|
|
|
+ while (running) {
|
|
|
|
+ try {
|
|
|
|
+ while (running) {
|
|
|
|
+ // 创建链接
|
|
|
|
+ CanalConnector connector = null;
|
|
|
|
+ if (CollUtil.isNotEmpty(canalConnectors)
|
|
|
|
+ && canalConnectors.containsKey(dataSourceEnum)
|
|
|
|
+ ) {
|
|
|
|
+ connector = canalConnectors.get(dataSourceEnum);
|
|
|
|
+ } else {
|
|
|
|
+ connector = CanalConnectors.newSingleConnector(new InetSocketAddress(sourceConfig.getServerHost(), sourceConfig.getServerPort()), sourceConfig.getInstance(), sourceConfig.getUsername(), sourceConfig.getPassword());
|
|
|
|
+
|
|
|
|
+ canalConnectors.put(dataSourceEnum, connector);
|
|
|
|
+ }
|
|
|
|
+ try {
|
|
|
|
+ //打开连接
|
|
|
|
+ connector.connect();
|
|
|
|
+ //订阅数据库表,全部表
|
|
|
|
+ connector.subscribe(sourceConfig.getSubscribe());
|
|
|
|
+ //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
|
|
|
|
+ connector.rollback();
|
|
|
|
+
|
|
|
|
+ while (true) {
|
|
|
|
+ // 检查应用是否正在关闭
|
|
|
|
+ if (!running) {
|
|
|
|
+ log.info("应用正在关闭,停止获取数据,数据源: {}", dataSourceEnum.name());
|
|
|
|
+ break;
|
|
}
|
|
}
|
|
- } else {
|
|
|
|
- //如果有数据,处理数据
|
|
|
|
- writeOut(dataSourceEnum, message);
|
|
|
|
|
|
+
|
|
|
|
+ // 获取指定数量的数据
|
|
|
|
+ Message message = connector.getWithoutAck(BATCH_SIZE);
|
|
|
|
+ //获取批量ID
|
|
|
|
+ long batchId = message.getId();
|
|
|
|
+ //获取批量的数量
|
|
|
|
+ int size = message.getEntries().size();
|
|
|
|
+
|
|
|
|
+ //如果没有数据
|
|
|
|
+ if (batchId == -1 || size == 0) {
|
|
|
|
+ try {
|
|
|
|
+ log.warn("没有数据");
|
|
|
|
+ //线程休眠2秒
|
|
|
|
+ Thread.sleep(LOCAL_THREAD_SLEEP);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ log.info("数据源:{},数据:{}", dataSourceEnum, size);
|
|
|
|
+ //如果有数据,处理数据
|
|
|
|
+ writeOut(dataSourceEnum, message);
|
|
|
|
+ }
|
|
|
|
+ log.info("connector.ack({});", batchId);
|
|
|
|
+ //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
|
|
|
|
+ connector.ack(batchId);
|
|
}
|
|
}
|
|
- //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
|
|
|
|
- connector.ack(batchId);
|
|
|
|
|
|
+ } catch (OutOfMemoryError e) {
|
|
|
|
+ log.error("内存不足错误,数据源: {}", dataSourceEnum.name(), e);
|
|
|
|
+ // 内存不足,可能需要重启应用
|
|
|
|
+ break;
|
|
|
|
+ } catch (StackOverflowError e) {
|
|
|
|
+ log.error("栈溢出错误,数据源: {}", dataSourceEnum.name(), e);
|
|
|
|
+ break;
|
|
|
|
+ } catch (ThreadDeath e) {
|
|
|
|
+ log.error("线程被强制终止,数据源: {}", dataSourceEnum.name(), e);
|
|
|
|
+ break;
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ log.error("CanalClient应用程序异常,数据源: {}", dataSourceEnum.name(), e);
|
|
|
|
+ // 应用程序异常,可以尝试恢复
|
|
|
|
+ } catch (Error e) {
|
|
|
|
+ log.error("CanalClient系统级错误,数据源: {}", dataSourceEnum.name(), e);
|
|
|
|
+ // 系统级错误,重新抛出
|
|
|
|
+ throw e;
|
|
|
|
+ } finally {
|
|
|
|
+ connector.disconnect();
|
|
|
|
+ log.info("{} 连接已断开", dataSourceEnum);
|
|
}
|
|
}
|
|
- } catch (Exception e) {
|
|
|
|
- e.printStackTrace();
|
|
|
|
- } finally {
|
|
|
|
- connector.disconnect();
|
|
|
|
- DynamicDataSourceContextHolder.clearDataSourceType();
|
|
|
|
- }
|
|
|
|
|
|
|
|
- try {
|
|
|
|
- Thread.sleep(sourceConfig.getInterval());
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- throw new RuntimeException(e);
|
|
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(sourceConfig.getInterval());
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ throw new RuntimeException(e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ log.error("CanalClient error", e);
|
|
}
|
|
}
|
|
- } catch (Exception e) {
|
|
|
|
- log.error("CanalClient error", e);
|
|
|
|
}
|
|
}
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ log.error("Canal线程异常退出,数据源: {}, 线程ID: {}, 异常: {}",
|
|
|
|
+ dataSourceEnum.name(),
|
|
|
|
+ currentThread.getId(),
|
|
|
|
+ e.getMessage(),
|
|
|
|
+ e);
|
|
|
|
+ } finally {
|
|
|
|
+ canalThreads.remove(dataSourceEnum);
|
|
}
|
|
}
|
|
});
|
|
});
|
|
}
|
|
}
|
|
@@ -152,9 +246,18 @@ public class CannalClient implements InitializingBean {
|
|
* 打印canal server解析binlog获得的实体类信息
|
|
* 打印canal server解析binlog获得的实体类信息
|
|
*/
|
|
*/
|
|
private void writeOut(DataSourceEnum dataSourceEnum, Message message) {
|
|
private void writeOut(DataSourceEnum dataSourceEnum, Message message) {
|
|
|
|
+ // 检查应用是否正在关闭
|
|
|
|
+ if (!running) {
|
|
|
|
+ log.warn("应用正在关闭,跳过数据处理");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
List<CanalEntry.Entry> entrys = message.getEntries();
|
|
List<CanalEntry.Entry> entrys = message.getEntries();
|
|
- for (CanalEntry.Entry entry : entrys) {
|
|
|
|
- if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
|
|
|
|
|
|
+ log.info("writeOut entrys size:{}", entrys.size());
|
|
|
|
+
|
|
|
|
+ for (int i=0;i< entrys.size();i++) {
|
|
|
|
+ CanalEntry.Entry entry = entrys.get(i);
|
|
|
|
+ if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|
|
|
|
+ || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
|
|
//开启/关闭事务的实体类型,跳过
|
|
//开启/关闭事务的实体类型,跳过
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
@@ -169,6 +272,12 @@ public class CannalClient implements InitializingBean {
|
|
//获取操作类型:insert/update/delete类型
|
|
//获取操作类型:insert/update/delete类型
|
|
CanalEntry.EventType eventType = rowChage.getEventType();
|
|
CanalEntry.EventType eventType = rowChage.getEventType();
|
|
|
|
|
|
|
|
+ if (eventType != CanalEntry.EventType.INSERT
|
|
|
|
+ && eventType != CanalEntry.EventType.UPDATE
|
|
|
|
+ && eventType != CanalEntry.EventType.DELETE) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
//判断是否是DDL语句
|
|
//判断是否是DDL语句
|
|
if (rowChage.getIsDdl()) {
|
|
if (rowChage.getIsDdl()) {
|
|
continue;
|
|
continue;
|
|
@@ -179,19 +288,33 @@ public class CannalClient implements InitializingBean {
|
|
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
|
|
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
|
|
AbstractCanalDbHandler canalDbHandler = tableNameHandlerMap.get(tableName);
|
|
AbstractCanalDbHandler canalDbHandler = tableNameHandlerMap.get(tableName);
|
|
if (canalDbHandler == null) {
|
|
if (canalDbHandler == null) {
|
|
- log.warn("{}没有对应的处理器", tableName);
|
|
|
|
|
|
+ log.warn("{} 没有对应的处理器", tableName);
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ log.info("{} 开始处理", tableName);
|
|
try {
|
|
try {
|
|
|
|
+ // 再次检查应用状态
|
|
|
|
+ if (!running) {
|
|
|
|
+ log.warn("应用正在关闭,跳过处理: {}", tableName);
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.CSO.name());
|
|
DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.CSO.name());
|
|
canalDbHandler.processRow(rowData, eventType, dataSourceEnum);
|
|
canalDbHandler.processRow(rowData, eventType, dataSourceEnum);
|
|
}catch (Exception e){
|
|
}catch (Exception e){
|
|
- log.error("处理数据异常", e);
|
|
|
|
|
|
+ if (!running) {
|
|
|
|
+ log.warn("应用正在关闭,忽略异常: {}", tableName);
|
|
|
|
+ break;
|
|
|
|
+ } else {
|
|
|
|
+ log.error("处理数据异常", e);
|
|
|
|
+ }
|
|
} finally {
|
|
} finally {
|
|
|
|
+ log.info("{} 结束处理", tableName);
|
|
|
|
+
|
|
DynamicDataSourceContextHolder.clearDataSourceType();
|
|
DynamicDataSourceContextHolder.clearDataSourceType();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ log.info("writeOut 处理完成");
|
|
}
|
|
}
|
|
}
|
|
}
|