Browse Source

canal connector连接管理

baiying 1 week ago
parent
commit
03e3d5480d
1 changed files with 13 additions and 4 deletions
  1. 13 4
      yaoyi-bi/src/main/java/com/retdata/yaoyibi/canal/CannalClient.java

+ 13 - 4
yaoyi-bi/src/main/java/com/retdata/yaoyibi/canal/CannalClient.java

@@ -43,7 +43,7 @@ public class CannalClient implements InitializingBean {
 
     private final static long LOCAL_THREAD_SLEEP = 2000L;
 
-    private List<CanalConnector> canalConnectors= new ArrayList<>();
+    private Map<DataSourceEnum, CanalConnector> canalConnectors= new HashMap<>(2);
 
     @PostConstruct
     public void initialize() {
@@ -55,7 +55,8 @@ public class CannalClient implements InitializingBean {
     @PreDestroy
     public void stop(){
         if(CollUtil.isNotEmpty(canalConnectors)){
-            for(CanalConnector canalConnector: canalConnectors){
+            for(Map.Entry<DataSourceEnum, CanalConnector> entry: canalConnectors.entrySet()){
+                CanalConnector canalConnector = entry.getValue();
                 if (canalConnector instanceof ClusterCanalConnector) {
                     ((ClusterCanalConnector) canalConnector).stopRunning();
                 } else if (canalConnector instanceof SimpleCanalConnector) {
@@ -77,8 +78,16 @@ public class CannalClient implements InitializingBean {
                 Thread t = new Thread(() -> {
                     while (true) {
                         // 创建链接
-                        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(sourceConfig.getServerHost(), sourceConfig.getServerPort()), sourceConfig.getInstance(), sourceConfig.getUsername(), sourceConfig.getPassword());
-                        canalConnectors.add(connector);
+                        CanalConnector connector = null;
+                        if(CollUtil.isNotEmpty(canalConnectors)
+                            && canalConnectors.containsKey(dataSourceEnum)
+                        ){
+                            connector = canalConnectors.get(dataSourceEnum);
+                        }else {
+                            connector = CanalConnectors.newSingleConnector(new InetSocketAddress(sourceConfig.getServerHost(), sourceConfig.getServerPort()), sourceConfig.getInstance(), sourceConfig.getUsername(), sourceConfig.getPassword());
+
+                            canalConnectors.put(dataSourceEnum, connector);
+                        }
                         try {
                             //打开连接
                             connector.connect();