package com.zhidian.cloud.canal.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.zhidian.cloud.canal.model.Canal;
import com.zhidian.cloud.canal.model.Pipeline;
import com.zhidian.cloud.canal.storage.CanalStorage;
import com.zhidian.cloud.canal.storage.PipelineStorage;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

/* loaded from: input_file:BOOT-INF/classes/com/zhidian/cloud/canal/canal/CanalDispatcher.class */
public class CanalDispatcher {
    CanalStorage canalStorage;
    PipelineStorage pipelineStorage;
    SimpleCanalSinker canalSinker;
    List<CanalMessageHandler> handlers = new ArrayList();
    List<CanalRowProcessor> processors = new ArrayList();

    public void init() {
        startupCanalMessageHandler(this.canalSinker);
    }

    private void startupCanalMessageHandler(SimpleCanalSinker simpleCanalSinker) {
        for (Canal canal : this.canalStorage.list()) {
            if (canal.getEnabled().booleanValue()) {
                CanalConnector newSingleConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canal.getHost(), canal.getPort().intValue()), canal.getDestination(), canal.getUsername(), canal.getPassword());
                List<Pipeline> listByCanalId = this.pipelineStorage.listByCanalId(canal.getId());
                CanalRowProcessor canalRowProcessor = new CanalRowProcessor();
                canalRowProcessor.setCanal(canal);
                canalRowProcessor.setPipelines(listByCanalId);
                canalRowProcessor.setCanalSinker(simpleCanalSinker);
                this.processors.add(canalRowProcessor);
                CanalMessageHandler canalMessageHandler = new CanalMessageHandler();
                canalMessageHandler.setCanalId(canal.getId());
                canalMessageHandler.setProperties(canal);
                canalMessageHandler.setConnector(newSingleConnector);
                canalMessageHandler.setProcessors(Arrays.asList(canalRowProcessor));
                this.handlers.add(canalMessageHandler);
                canalMessageHandler.start();
            }
        }
    }

    public CanalMessageHandler getHandler(String str) {
        for (CanalMessageHandler canalMessageHandler : this.handlers) {
            if (Objects.equals(canalMessageHandler.getCanalId(), str)) {
                return canalMessageHandler;
            }
        }
        return null;
    }

    public void refreshProcessors() {
        for (CanalRowProcessor canalRowProcessor : this.processors) {
            canalRowProcessor.resetPipelines(this.pipelineStorage.listByCanalId(canalRowProcessor.getCanal().getId()));
        }
    }

    public void setCanalStorage(CanalStorage canalStorage) {
        this.canalStorage = canalStorage;
    }

    public void setPipelineStorage(PipelineStorage pipelineStorage) {
        this.pipelineStorage = pipelineStorage;
    }

    public void setCanalSinker(SimpleCanalSinker simpleCanalSinker) {
        this.canalSinker = simpleCanalSinker;
    }
}
