package com.zhidian.cloud.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.zhidian.cloud.canal.CanalServerConfigure;
import com.zhidian.cloud.common.logger.Logger;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:com/zhidian/cloud/canal/CanalClient.class */
public class CanalClient {
    private String logTopic;
    private CanalServerConfigure configure;
    private Logger logger = Logger.getLogger(getClass(), CommonConstant.PROJECT_NAME);
    private List<String> descs = new ArrayList();
    private final List<HandlerOnInsert> handlersOnInsert = new ArrayList();
    private final List<HandlerOnDelete> handlersOnDelete = new ArrayList();
    private final List<HandlerOnUpdate> handlersOnUpdate = new ArrayList();
    private CanalConnector connector = null;
    private volatile Thread monitorThread = null;
    private Lock lock = new ReentrantLock();
    private AtomicBoolean hasHandler = new AtomicBoolean(false);

    /* loaded from: input_file:com/zhidian/cloud/canal/CanalClient$Worker.class */
    private class Worker implements Runnable {
        private Worker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            CanalClient.this.createCanalConnector();
            while (CanalClient.this.monitorThread == Thread.currentThread()) {
                Message message = null;
                try {
                    message = CanalClient.this.connector.get(CanalClient.this.configure.getBatchSize(), Long.valueOf(CanalClient.this.configure.getTimeout()), TimeUnit.MILLISECONDS);
                } catch (Exception e) {
                    CanalClient.this.destroyCanalConnector();
                    CanalClient.this.logger.warn("{}canal 获取消息失败,准备重连!", new Object[]{CanalClient.this.logTopic, e});
                    CanalClient.this.reconnect();
                }
                try {
                    CanalClient.this.handleMessage(message);
                } catch (Exception e2) {
                    CanalClient.this.logger.warn("{}处理canal消息异常", new Object[]{CanalClient.this.logTopic, e2});
                    try {
                        synchronized (this) {
                            wait(CanalClient.this.configure.getTimeout());
                        }
                    } catch (InterruptedException e3) {
                        CanalClient.this.logger.warn("{}canal监控线程interrupted,监控退出", new Object[]{CanalClient.this.logTopic, e3});
                    }
                }
            }
            CanalClient.this.destroyCanalConnector();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CanalClient(CanalServerConfigure canalServerConfigure) {
        this.logTopic = "";
        this.configure = canalServerConfigure;
        this.logTopic = "【" + canalServerConfigure.getDestination() + "】";
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            destroyCanalConnector();
        }, "canal-client-shutdown-hook"));
    }

    public CanalClient onInsert(HandlerOnInsert handlerOnInsert) {
        if (handlerOnInsert != null) {
            this.handlersOnInsert.add(handlerOnInsert);
            this.hasHandler.compareAndSet(false, true);
        }
        return this;
    }

    public CanalClient onDelete(HandlerOnDelete handlerOnDelete) {
        if (handlerOnDelete != null) {
            this.handlersOnDelete.add(handlerOnDelete);
            this.hasHandler.compareAndSet(false, true);
        }
        return this;
    }

    public CanalClient onUpdate(HandlerOnUpdate handlerOnUpdate) {
        if (handlerOnUpdate != null) {
            this.handlersOnUpdate.add(handlerOnUpdate);
            this.hasHandler.compareAndSet(false, true);
        }
        return this;
    }

    public void startup() {
        check();
        this.lock.lock();
        try {
            this.monitorThread = new Thread(new Worker());
            this.monitorThread.setDaemon(true);
            this.monitorThread.setName("canal_connect_" + this.configure.getDestination());
            this.monitorThread.start();
        } finally {
            this.lock.unlock();
        }
    }

    private void check() {
        if (!this.hasHandler.get()) {
            throw new CanalClientException("startup方法被调用前,必须至少指定一个handler");
        }
    }

    public void stop() {
        this.lock.lock();
        try {
            if (isStartup()) {
                Thread thread = this.monitorThread;
                this.monitorThread = null;
                thread.interrupt();
            }
        } finally {
            this.lock.unlock();
        }
    }

    public boolean isStartup() {
        this.lock.lock();
        try {
            return this.monitorThread != null;
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleMessage(Message message) {
        List<CanalEntry.Entry> entries;
        if (message == null || message.getId() == -1 || (entries = message.getEntries()) == null || entries.isEmpty()) {
            return;
        }
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
                try {
                    CanalEntry.RowChange parseFrom = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    CanalEntry.EventType eventType = parseFrom.getEventType();
                    String tableName = entry.getHeader().getTableName();
                    if (tableName != null && !tableName.trim().isEmpty()) {
                        String schemaName = entry.getHeader().getSchemaName();
                        long executeTime = entry.getHeader().getExecuteTime();
                        this.logger.debug(String.format("%scanal table name[%s,%s],eventType : %s", this.logTopic, schemaName, tableName, eventType));
                        try {
                            for (CanalEntry.RowData rowData : parseFrom.getRowDatasList()) {
                                if (eventType == CanalEntry.EventType.INSERT) {
                                    if (!this.handlersOnInsert.isEmpty()) {
                                        Iterator<HandlerOnInsert> it = this.handlersOnInsert.iterator();
                                        while (it.hasNext()) {
                                            it.next().onInsert(schemaName, tableName, CanalUtil.getAfterIdValue(rowData), CanalUtil.getAfterColumnsFrom(rowData), executeTime);
                                        }
                                    }
                                } else if (eventType == CanalEntry.EventType.DELETE) {
                                    if (!this.handlersOnDelete.isEmpty()) {
                                        Iterator<HandlerOnDelete> it2 = this.handlersOnDelete.iterator();
                                        while (it2.hasNext()) {
                                            it2.next().onDelete(schemaName, tableName, CanalUtil.getBeforeIdValue(rowData), CanalUtil.getBeforeColumnsFrom(rowData), executeTime);
                                        }
                                    }
                                } else if (eventType == CanalEntry.EventType.UPDATE && !this.handlersOnUpdate.isEmpty()) {
                                    Iterator<HandlerOnUpdate> it3 = this.handlersOnUpdate.iterator();
                                    while (it3.hasNext()) {
                                        it3.next().onUpdate(schemaName, tableName, CanalUtil.getBeforeIdValue(rowData), CanalUtil.getAfterColumnsFrom(rowData), CanalUtil.getUpdatedColumns(rowData), executeTime);
                                    }
                                }
                            }
                        } catch (Exception e) {
                            this.logger.warn("{}handler处理时异常,table name[{},{}],eventType:{}", new Object[]{this.logTopic, schemaName, tableName, eventType, e});
                        }
                    }
                } catch (Exception e2) {
                    this.logger.warn("{}ERROR ## parser of eromanga-event has an error,data:{}", new Object[]{this.logTopic, entry.toString(), e2});
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean createCanalConnector() {
        if (!isStartup()) {
            this.logger.info("创建连接失败,因为客户端未启动,导致的原因有可能是:1.客户端没有启动,即还没有调用startup()方法;2.或者已经停止了,即已经调用了stop()方法!");
            return false;
        }
        String str = null;
        this.lock.lock();
        try {
            try {
                if (this.connector != null && this.connector.checkValid()) {
                    this.lock.unlock();
                    return true;
                }
                if (this.configure.getConnectorType() == CanalServerConfigure.ConnectorType.single) {
                    str = this.configure.getIp() + ":" + this.configure.getPort();
                    this.connector = CanalConnectors.newSingleConnector(new InetSocketAddress(this.configure.getIp(), this.configure.getPort()), this.configure.getDestination(), this.configure.getUsername(), this.configure.getPassword());
                } else if (this.configure.getConnectorType() == CanalServerConfigure.ConnectorType.cluster) {
                    str = this.configure.getZkServers();
                    this.connector = CanalConnectors.newClusterConnector(this.configure.getZkServers(), this.configure.getDestination(), this.configure.getUsername(), this.configure.getPassword());
                }
                this.connector.connect();
                this.connector.subscribe();
                this.connector.rollback();
                this.logger.info("{}canal client连接server成功", new Object[]{this.logTopic});
                this.lock.unlock();
                return true;
            } catch (Exception e) {
                this.logger.warn("{}canal创建连接失败. server={},username={},password={},destination={}", new Object[]{this.logTopic, str, this.configure.getUsername(), this.configure.getPassword(), this.configure.getDestination(), e});
                destroyCanalConnector();
                this.lock.unlock();
                return false;
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconnect() {
        if (!isStartup()) {
            this.logger.info("重连失败,因为客户端未启动,导致的原因有可能是:1.客户端没有启动,即还没有调用startup()方法;2.或者已经停止了,即已经调用了stop()方法!");
            return;
        }
        destroyCanalConnector();
        boolean z = false;
        for (int i = 1; i <= 6 && !z; i++) {
            z = createCanalConnector();
            this.logger.info("{}canal client 第{}次重连,结果={}", new Object[]{this.logTopic, Integer.valueOf(i), Boolean.valueOf(z)});
            if (!z) {
                try {
                    Thread.sleep(10000L);
                } catch (InterruptedException e) {
                }
            }
        }
        if (z) {
            return;
        }
        this.logger.warn("{}canal client 重连失败!", new Object[]{this.logTopic});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void destroyCanalConnector() {
        this.lock.lock();
        try {
            if (this.connector != null) {
                this.connector.disconnect();
                this.connector = null;
                this.logger.info("{}destination={} canal connector 成功断开连接", new Object[]{this.logTopic, this.configure.getDestination()});
            }
        } catch (Exception e) {
            this.logger.warn("{}canal connector disconnect异常", new Object[]{this.logTopic, e});
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void appendDesc(String str) {
        this.descs.add(str);
    }

    List<String> getDescs() {
        return this.descs;
    }
}
