package com.zhidian.cloud.canal.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.zhidian.cloud.common.logger.Logger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import jodd.util.StringPool;
import org.apache.commons.lang3.StringUtils;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

/* loaded from: input_file:BOOT-INF/lib/canal-core-0.0.4.jar:com/zhidian/cloud/canal/canal/CanalMessageHandler.class */
public class CanalMessageHandler extends Thread {
    String canalId;
    CanalProperties properties;
    CanalConnector connector;
    CanalAlerter alerter;
    List<CanalProcessor> processors;
    DataSourceTransactionManager transactionManager;
    Logger logger = Logger.getLogger(getClass());
    AtomicInteger reconnectCounter = new AtomicInteger(0);
    private LinkedBlockingQueue<Message> customTableQueue = new LinkedBlockingQueue<>(50);
    private long currentBatchId = 0;

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        new Thread(() -> {
            handleMsg();
        }).start();
        while (this.properties.getEnabled().booleanValue()) {
            try {
                this.connector.connect();
                this.connector.subscribe(this.properties.getSubscribe());
                doRun();
            } catch (CanalClientException e) {
                try {
                    this.connector.disconnect();
                    this.currentBatchId = 1L;
                } catch (CanalClientException e2) {
                    this.logger.warn((String) StringUtils.defaultIfBlank(e2.getMessage(), "断开服务失败"), e2);
                }
                this.reconnectCounter.incrementAndGet();
                if (this.properties.getReconnectCount().intValue() == -1 || this.properties.getReconnectCount().intValue() >= this.reconnectCounter.get()) {
                    this.logger.warn(String.format("服务 %s:%s 连接失败，%s 毫秒后重新连接", this.properties.getHost(), this.properties.getPort(), this.properties.getReconnectInterval()), e);
                    sleepx(this.properties.getReconnectInterval().longValue());
                } else {
                    this.logger.error(String.format("服务 %s:%s 不可用，停止访问，请手动重启", this.properties.getHost(), this.properties.getPort()));
                    if (this.alerter != null) {
                        this.alerter.alert(String.format("服务 %s:%s 不可用", this.properties.getHost(), this.properties.getPort()), e);
                    }
                    while (this.properties.getReconnectCount().intValue() < this.reconnectCounter.get()) {
                        sleepx(600000L);
                    }
                }
            }
        }
    }

    public void doRun() throws CanalClientException {
        while (this.properties.getEnabled().booleanValue()) {
            Message withoutAck = this.connector.getWithoutAck(5000);
            if (withoutAck.getId() == -1 || withoutAck.getEntries().size() == 0) {
                sleepx(3000L);
            } else {
                if (this.currentBatchId == 0 && withoutAck.getId() != -1) {
                    this.currentBatchId = withoutAck.getId();
                }
                TransactionStatus transactionStatus = null;
                if (this.transactionManager != null) {
                    transactionStatus = this.transactionManager.getTransaction(new DefaultTransactionDefinition(0));
                }
                try {
                    this.customTableQueue.put(withoutAck);
                    if (transactionStatus != null) {
                        this.transactionManager.commit(transactionStatus);
                    }
                } catch (Exception e) {
                    this.logger.error(String.format("Message{id=%s} 处理失败 %s", Long.valueOf(withoutAck.getId()), obtainRowChangeContent(withoutAck)), e);
                    if (this.alerter != null) {
                        this.alerter.alert(String.format("Message{id=%s} 处理失败 %s", Long.valueOf(withoutAck.getId()), obtainRowChangeContent(withoutAck)), e);
                    }
                    if (transactionStatus != null) {
                        this.transactionManager.rollback(transactionStatus);
                    }
                    this.connector.rollback(withoutAck.getId());
                }
            }
        }
    }

    private void handleMsg() {
        long j = 0;
        while (true) {
            try {
                Message take = this.customTableQueue.take();
                j = take == null ? 0L : take.getId();
                if (j - 1 == this.currentBatchId || j == this.currentBatchId) {
                    handleMessage(take);
                    this.connector.ack(j);
                    this.currentBatchId = j;
                } else {
                    this.logger.warn("==========当前batchid不等于batchId-1 乱序了================");
                    this.logger.warn("当前的Id是: " + Long.toString(this.currentBatchId));
                    this.logger.warn("从队列拿出来的ID是: " + Long.toString(j));
                    this.customTableQueue.put(take);
                }
            } catch (Exception e) {
                this.logger.warn("当前回滚的batchId是" + Long.toString(j));
                if (j > 0) {
                    this.connector.rollback(j);
                }
                this.logger.error("处理消息异常" + e.getMessage() + e.getStackTrace().toString());
                return;
            }
        }
    }

    private void handleMessage(Message message) throws Exception {
        for (CanalEntry.Entry entry : message.getEntries()) {
            String schemaName = entry.getHeader().getSchemaName();
            String tableName = entry.getHeader().getTableName();
            if (!schemaName.isEmpty() && !tableName.isEmpty()) {
                System.out.println("======tableName========" + tableName);
                for (CanalProcessor canalProcessor : this.processors) {
                    CanalEntry.RowChange rowChange = CanalHelper.getRowChange(entry);
                    if (rowChange != null) {
                        canalProcessor.process(rowChange, entry, message);
                    }
                }
            }
        }
    }

    private boolean sleepx(long j) {
        try {
            Thread.sleep(j);
            return true;
        } catch (Exception e) {
            this.logger.warn((String) StringUtils.defaultIfBlank(e.getMessage(), "休眠被迫中止"), e);
            return false;
        }
    }

    private String obtainRowChangeContent(Message message) {
        CanalEntry.RowChange rowChange;
        StringBuilder sb = new StringBuilder();
        sb.append("[");
        StringBuilder sb2 = new StringBuilder();
        for (CanalEntry.Entry entry : message.getEntries()) {
            String schemaName = entry.getHeader().getSchemaName();
            String tableName = entry.getHeader().getTableName();
            Iterator<CanalProcessor> it = this.processors.iterator();
            while (it.hasNext()) {
                if (it.next().acceptable(schemaName, tableName) && (rowChange = CanalHelper.getRowChange(entry)) != null) {
                    sb2.setLength(0);
                    sb2.append(tableName);
                    sb2.append(StringPool.LEFT_BRACE);
                    if (rowChange.getEventType() != CanalEntry.EventType.QUERY && !rowChange.getIsDdl()) {
                        if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
                            sb2.append("INSERT ");
                            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                                for (int i = 0; i < rowData.getAfterColumnsCount(); i++) {
                                    sb2.append(rowData.getAfterColumns(i).getName());
                                    sb2.append(StringPool.EQUALS);
                                    sb2.append(rowData.getAfterColumns(i).getValue());
                                    sb2.append(", ");
                                }
                            }
                        } else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
                            sb2.append("UPDATE ");
                            for (CanalEntry.RowData rowData2 : rowChange.getRowDatasList()) {
                                for (int i2 = 0; i2 < rowData2.getAfterColumnsCount(); i2++) {
                                    sb2.append(rowData2.getAfterColumns(i2).getName());
                                    sb2.append(StringPool.EQUALS);
                                    sb2.append(rowData2.getAfterColumns(i2).getValue());
                                    sb2.append("(");
                                    sb2.append(rowData2.getBeforeColumns(i2).getValue());
                                    sb2.append(")");
                                    sb2.append(", ");
                                }
                            }
                        } else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
                            sb2.append("DELETE ");
                            for (CanalEntry.RowData rowData3 : rowChange.getRowDatasList()) {
                                for (int i3 = 0; i3 < rowData3.getBeforeColumnsCount(); i3++) {
                                    sb2.append(rowData3.getBeforeColumns(i3).getName());
                                    sb2.append(StringPool.EQUALS);
                                    sb2.append(rowData3.getBeforeColumns(i3).getValue());
                                    sb2.append(", ");
                                }
                            }
                        }
                    }
                    if (sb2.length() >= 2) {
                        sb2.replace(sb2.length() - 2, sb2.length(), "}");
                    } else {
                        sb2.append("}");
                    }
                    sb.append(sb2.toString());
                    sb.append(", ");
                }
            }
        }
        if (sb.length() >= 2) {
            sb.replace(sb.length() - 2, sb.length(), "]");
        } else {
            sb.append("]");
        }
        return sb.toString();
    }

    public AtomicInteger getReconnectCounter() {
        return this.reconnectCounter;
    }

    public String getCanalId() {
        return this.canalId;
    }

    public void setCanalId(String str) {
        this.canalId = str;
    }

    public void setProperties(CanalProperties canalProperties) {
        this.properties = canalProperties;
    }

    public void setConnector(CanalConnector canalConnector) {
        this.connector = canalConnector;
    }

    public void setAlerter(CanalAlerter canalAlerter) {
        this.alerter = canalAlerter;
    }

    public void setTransactionManager(DataSourceTransactionManager dataSourceTransactionManager) {
        this.transactionManager = dataSourceTransactionManager;
    }

    public void setProcessors(List<CanalProcessor> list) {
        if (list == null) {
            list = new ArrayList();
        }
        this.processors = list;
    }
}
