package net.wicp.tams.common.binlog.alone.parser;

import io.thekraken.grok.api.exception.GrokException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.wicp.tams.common.Conf;
import net.wicp.tams.common.apiext.GrokObj;
import net.wicp.tams.common.apiext.PwdUtil;
import net.wicp.tams.common.apiext.ReflectAssist;
import net.wicp.tams.common.apiext.StringUtil;
import net.wicp.tams.common.apiext.UUIDGenerator;
import net.wicp.tams.common.apiext.jdbc.JdbcAssit;
import net.wicp.tams.common.apiext.jdbc.JdbcConnection;
import net.wicp.tams.common.binlog.alone.ListenerConf;
import net.wicp.tams.common.binlog.alone.normalize.IBinlogListener;
import net.wicp.tams.common.binlog.alone.normalize.ISaveCheckPoint;
import net.wicp.tams.common.binlog.parser.event.GtidLogEvent;
import net.wicp.tams.common.binlog.parser.event.QueryLogEvent;
import net.wicp.tams.common.binlog.parser.event.RowsLogBuffer;
import net.wicp.tams.common.binlog.parser.event.RowsLogEvent;
import net.wicp.tams.common.binlog.parser.event.TableMapLogEvent;
import net.wicp.tams.common.binlog.parser.event.XidLogEvent;
import net.wicp.tams.common.constant.DateFormatCase;
import net.wicp.tams.common.constant.OptType;
import net.wicp.tams.common.constant.StrPattern;
import net.wicp.tams.common.constant.dic.YesOrNo;
import net.wicp.tams.common.exception.ExceptAll;
import net.wicp.tams.common.exception.ProjectExceptionRuntime;
import net.wicp.tams.duckula.client.Protobuf3;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/wicp/tams/common/binlog/alone/parser/BaseLogFetcher.class */
public abstract class BaseLogFetcher {
    private static final Logger log = LoggerFactory.getLogger(BaseLogFetcher.class);
    protected String gtids;
    protected final ListenerConf.ConnConf connConf;
    protected final BinlogMetricGroup metric;
    private final IBinlogListener binlogListener;
    protected ISaveCheckPoint saveCheckPoint;
    protected final Map<String, List<ListenerConf.ColHis>> colsMap;
    protected long xid;
    private PreparedStatement prepCols;
    private PreparedStatement prepRowkey;
    protected String fileName = "mysql-bin.000001";
    protected Charset charset = Charset.forName("utf-8");
    private GrokObj gm = GrokObj.getInstance();

    public IBinlogListener getBinlogListener() {
        return this.binlogListener;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseLogFetcher(ListenerConf.ConnConf.Builder builder) {
        this.gm.addPattern("tb", "[A-Za-z0-9_.-:]+");
        this.gm.addPattern("tball", "alter\\s+table\\s+`?%{tb}`?");
        this.gm.addPattern("db", "[A-Za-z0-9_.-:]+");
        this.gm.addPattern("tball2", "alter\\s+table\\s+`?%{db}`.`%{tb}`?");
        this.colsMap = new HashMap();
        Validate.notBlank(builder.getIp(), "需要ip地址", new Object[0]);
        Validate.notBlank(builder.getUser(), "需要用户名", new Object[0]);
        Validate.notBlank(builder.getPassword(), "需要密码", new Object[0]);
        Validate.notBlank(builder.getListener(), "需要处理的监听类", new Object[0]);
        if (builder.getPort() <= 0) {
            builder.setPort(3306);
        }
        if (builder.getClientId() <= 0) {
            builder.setClientId(StringUtil.buildPort(new UUIDGenerator().generate()));
        }
        if (StringUtil.isNull(builder.getDbPattern())) {
            builder.setDbPattern("^.*$");
        }
        if (StringUtil.isNull(builder.getTbPattern())) {
            builder.setTbPattern("^.*$");
        }
        if (StringUtil.isNull(builder.getChk())) {
            builder.setChk("net.wicp.tams.common.binlog.alone.checkpoint.CheckPointH2db");
        }
        try {
            this.saveCheckPoint = (ISaveCheckPoint) Class.forName(builder.getChk()).newInstance();
            this.saveCheckPoint.init(builder);
            init(builder);
            YesOrNo acquireLock = this.saveCheckPoint.acquireLock();
            if (acquireLock == null || acquireLock == YesOrNo.no) {
                throw new ProjectExceptionRuntime(ExceptAll.project_other, "不能获得分布式锁");
            }
            this.connConf = builder.build();
            this.metric = new BinlogMetricGroup(String.format("%s:%s", this.connConf.getIp(), Integer.valueOf(this.connConf.getPort())));
            try {
                Class<?> cls = Class.forName(this.connConf.getListener());
                if (!ReflectAssist.isInterface(cls, "net.wicp.tams.common.binlog.alone.normalize.IBinlogListener")) {
                    log.error("Listener需要net.wicp.tams.common.binlog.alone.IBinlogListener类型");
                    throw new ProjectExceptionRuntime(ExceptAll.Param_typenofit, "Listener需要net.wicp.tams.common.binlog.alone.IBinlogListener类型");
                }
                try {
                    this.binlogListener = (IBinlogListener) ReflectAssist.newInst(cls);
                } catch (Exception e) {
                    log.error("Listener实例化失败");
                    throw new ProjectExceptionRuntime(ExceptAll.Param_typenofit, "Listener实例化失败");
                }
            } catch (ClassNotFoundException e2) {
                log.error("没有指定的Listener");
                throw new ProjectExceptionRuntime(ExceptAll.Param_typenofit, "没有指定的Listener");
            }
        } catch (Exception e3) {
            throw new ProjectExceptionRuntime(ExceptAll.project_other, "创建checkpoint实例失败");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void parseQueryEvent(QueryLogEvent queryLogEvent) {
        String dbName = queryLogEvent.getDbName();
        String lowerCase = queryLogEvent.getQuery().toLowerCase();
        if (lowerCase.startsWith("alter")) {
            try {
                String replace = String.valueOf(this.gm.match("%{tball}", lowerCase).toMap().get("tb")).replace(dbName + ".", "");
                if (dbName.equals(replace)) {
                    replace = String.valueOf(this.gm.match("%{tball2}", lowerCase).toMap().get("tb"));
                }
                if (isValid(dbName, replace)) {
                    findCols(dbName, replace, queryLogEvent.getWhen());
                }
            } catch (GrokException e) {
                log.error("get tb from sql error", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void parseGtidLogEvent(GtidLogEvent gtidLogEvent) throws Exception {
        this.gtids = gtidLogEvent.getGtid();
        parseGtidLogEventSub(gtidLogEvent);
    }

    protected abstract void parseGtidLogEventSub(GtidLogEvent gtidLogEvent);

    protected abstract void init(ListenerConf.ConnConf.Builder builder);

    public abstract void close();

    public abstract void read();

    /* JADX INFO: Access modifiers changed from: protected */
    public void parseXidEvent(XidLogEvent xidLogEvent) {
        this.xid = xidLogEvent.getXid();
    }

    private boolean isValid(String str, String str2) {
        return !str.equals(Conf.get("common.binlog.alone.chk.mysql.defaultdb")) && StrPattern.checkStrFormat(this.connConf.getDbPattern(), str) && StrPattern.checkStrFormat(this.connConf.getTbPattern(), str2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public boolean parseRowsEvent(RowsLogEvent rowsLogEvent, OptType optType) {
        if (log.isDebugEnabled()) {
            log.debug("db:{},tb:{},time:{}", new Object[]{rowsLogEvent.getTable().getDbName(), rowsLogEvent.getTable().getTableName(), DateFormatCase.YYYY_MM_DD_hhmmss.getInstanc().format(new Date(rowsLogEvent.getHeader().getWhen() * 1000))});
        }
        if (!isValid(rowsLogEvent.getTable().getDbName(), rowsLogEvent.getTable().getTableName())) {
            return false;
        }
        this.metric.meter_parser_pack_row.mark();
        ListenerConf.ColHis ValidKey = ValidKey(rowsLogEvent.getTable().getDbName(), rowsLogEvent.getTable().getTableName(), rowsLogEvent.getWhen());
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        int i = 0;
        try {
            RowsLogBuffer rowsBuf = rowsLogEvent.getRowsBuf(this.charset.name());
            BitSet columns = rowsLogEvent.getColumns();
            BitSet changeColumns = rowsLogEvent.getChangeColumns();
            while (rowsBuf.nextOneRow(columns)) {
                int type = rowsLogEvent.getHeader().getType();
                if (23 == type || 30 == type) {
                    arrayList2.add(parseOneRow(rowsLogEvent, rowsBuf, columns, ValidKey.getColsCount(), true));
                } else if (25 == type || 32 == type) {
                    arrayList.add(parseOneRow(rowsLogEvent, rowsBuf, columns, ValidKey.getColsCount(), false));
                } else {
                    arrayList.add(parseOneRow(rowsLogEvent, rowsBuf, columns, ValidKey.getColsCount(), true));
                    if (!rowsBuf.nextOneRow(changeColumns)) {
                        break;
                    }
                    arrayList2.add(parseOneRow(rowsLogEvent, rowsBuf, changeColumns, ValidKey.getColsCount(), true));
                }
                i++;
            }
            String[] strArr = new String[arrayList2.size()];
            for (int i2 = 0; i2 < arrayList2.size(); i2++) {
                strArr[i2] = arrayList2.get(i2);
            }
            String[] strArr2 = new String[arrayList.size()];
            for (int i3 = 0; i3 < arrayList.size(); i3++) {
                strArr2[i3] = arrayList.get(i3);
            }
            Protobuf3.DuckulaEvent.Builder newBuilder = Protobuf3.DuckulaEvent.newBuilder();
            newBuilder.setColNum(ArrayUtils.isNotEmpty(strArr) ? strArr[0].length : strArr2[0].length);
            newBuilder.addAllCols(ValidKey.mo57getColsList());
            for (TableMapLogEvent.ColumnInfo columnInfo : rowsLogEvent.getTable().getColumnInfo()) {
                newBuilder.addColsTypeValue(columnInfo.type);
            }
            newBuilder.setDb(rowsLogEvent.getTable().getDbName());
            newBuilder.setTb(rowsLogEvent.getTable().getTableName());
            newBuilder.setGtid(this.gtids);
            newBuilder.setIsError(false);
            newBuilder.setOptType(Protobuf3.OptType.forNumber(optType.getValue()));
            for (int i4 = 0; i4 < i; i4++) {
                Protobuf3.DuckulaEvent.Builder clone = newBuilder.clone();
                initData(clone, arrayList, ValidKey.mo57getColsList(), i4);
                initData(clone, arrayList2, ValidKey.mo57getColsList(), i4);
                this.binlogListener.doBui(clone.build());
            }
            this.metric.meter_parser_event.mark(i);
            this.metric.counter_ringbuff_pack.inc();
            this.metric.counter_ringbuff_event.inc(i);
            return true;
        } catch (Exception e) {
            throw new RuntimeException("parse row data failed.", e);
        }
    }

    protected String[] parseOneRow(RowsLogEvent rowsLogEvent, RowsLogBuffer rowsLogBuffer, BitSet bitSet, int i, boolean z) throws UnsupportedEncodingException {
        TableMapLogEvent table = rowsLogEvent.getTable();
        if (table == null) {
            throw new RuntimeException("not found TableMap with tid=" + rowsLogEvent.getTableId());
        }
        if (i != rowsLogEvent.getTable().getColumnCnt()) {
            throw new RuntimeException("TableMap:" + rowsLogEvent.getTableId() + " the colsname is:" + i + " the value size is:" + rowsLogEvent.getTable().getColumnCnt());
        }
        String[] strArr = new String[rowsLogEvent.getTable().getColumnCnt()];
        int columnCnt = table.getColumnCnt();
        TableMapLogEvent.ColumnInfo[] columnInfo = table.getColumnInfo();
        for (int i2 = 0; i2 < columnCnt; i2++) {
            if (bitSet.get(i2)) {
                TableMapLogEvent.ColumnInfo columnInfo2 = columnInfo[i2];
                rowsLogBuffer.nextValue(columnInfo2.type, columnInfo2.meta);
                if (!rowsLogBuffer.isNull()) {
                    Object value = rowsLogBuffer.getValue();
                    if (value instanceof byte[]) {
                        strArr[i2] = PwdUtil.base64FromBin((byte[]) value);
                    } else {
                        strArr[i2] = String.valueOf(value);
                    }
                }
            }
        }
        return strArr;
    }

    public ListenerConf.ColHis findCols(String str, String str2, long j) {
        String lowerCase = String.format("%s|%s", str, str2).toLowerCase();
        Connection connection = null;
        try {
            try {
                Connection connection2 = JdbcConnection.getConnection("com.mysql.jdbc.Driver", String.format("jdbc:mysql://%s:%s?autoReconnect=true&useUnicode=true&characterEncoding=utf-8", this.connConf.getIp(), Integer.valueOf(this.connConf.getPort())), this.connConf.getUser(), this.connConf.getPassword());
                if (this.prepCols == null || this.prepCols.isClosed()) {
                    this.prepCols = connection2.prepareStatement("select   column_name,data_type   from  information_schema.columns  where  table_schema=? and table_name=?");
                }
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                JdbcAssit.setPreParam(this.prepCols, new Object[]{str, str2});
                ResultSet executeQuery = this.prepCols.executeQuery();
                while (executeQuery.next()) {
                    arrayList.add(executeQuery.getString(1));
                    arrayList2.add(executeQuery.getString(2));
                }
                executeQuery.close();
                if (CollectionUtils.isEmpty(arrayList)) {
                    String format = String.format("db:%s,td:%s,user:%s 没有s查询到列名，请检查用户是否有此权限", str, str2, this.connConf.getUser());
                    log.error(format);
                    throw new RuntimeException(format);
                }
                if (this.connConf.getRds()) {
                    if (this.prepRowkey == null || this.prepRowkey.isClosed()) {
                        this.prepRowkey = connection2.prepareStatement("SELECT k.column_name FROM information_schema.table_constraints t JOIN information_schema.key_column_usage k USING (constraint_name,table_schema,table_name) WHERE t.constraint_type='PRIMARY KEY' AND t.table_schema=? AND t.table_name=?");
                    }
                    JdbcAssit.setPreParam(this.prepRowkey, new Object[]{str, str2});
                    ResultSet executeQuery2 = this.prepRowkey.executeQuery();
                    if (!executeQuery2.next()) {
                        arrayList.add("_rowkey_");
                        arrayList2.add("varchar");
                    }
                    executeQuery2.close();
                }
                ListenerConf.ColHis.Builder newBuilder = ListenerConf.ColHis.newBuilder();
                newBuilder.setTime(j);
                newBuilder.setTimeStr(DateFormatCase.YYYY_MM_DD_hhmmss.getInstanc().format(Long.valueOf(j == -1 ? System.currentTimeMillis() : j * 1000)));
                newBuilder.setServerIp(this.connConf.getIp());
                newBuilder.setDb(str);
                newBuilder.setTb(str2);
                newBuilder.addAllCols(arrayList);
                newBuilder.addAllColTypes(arrayList2);
                ListenerConf.ColHis m90build = newBuilder.m90build();
                this.saveCheckPoint.saveColName(m90build);
                List<ListenerConf.ColHis> findColsList = this.saveCheckPoint.findColsList(str, str2);
                Collections.sort(findColsList, new Comparator<ListenerConf.ColHis>() { // from class: net.wicp.tams.common.binlog.alone.parser.BaseLogFetcher.1
                    @Override // java.util.Comparator
                    public int compare(ListenerConf.ColHis colHis, ListenerConf.ColHis colHis2) {
                        long time = colHis2.getTime() - colHis.getTime();
                        if (time > 0) {
                            return 1;
                        }
                        return time < 0 ? -1 : 0;
                    }
                });
                this.colsMap.put(lowerCase, findColsList);
                if (connection2 != null) {
                    try {
                        connection2.close();
                    } catch (SQLException e) {
                        log.error("关闭连接失败", e);
                    }
                }
                return m90build;
            } catch (Exception e2) {
                log.error("获取cols错误", e2);
                throw new RuntimeException("获取cols错误");
            }
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    connection.close();
                } catch (SQLException e3) {
                    log.error("关闭连接失败", e3);
                }
            }
            throw th;
        }
    }

    public ListenerConf.ColHis ValidKey(String str, String str2, long j) {
        String lowerCase = String.format("%s|%s", str, str2).toLowerCase();
        if (this.colsMap.containsKey(lowerCase)) {
            ListenerConf.ColHis selFitColName = selFitColName(j, this.colsMap.get(lowerCase));
            if (selFitColName == null) {
                throw new RuntimeException("没有可用的col信息。");
            }
            return selFitColName;
        }
        List<ListenerConf.ColHis> findColsList = this.saveCheckPoint.findColsList(str, str2);
        if (CollectionUtils.isEmpty(findColsList)) {
            ListenerConf.ColHis findCols = findCols(str, str2, -1L);
            ArrayList arrayList = new ArrayList();
            arrayList.add(findCols);
            this.colsMap.put(lowerCase, arrayList);
            return findCols;
        }
        Collections.sort(findColsList, new Comparator<ListenerConf.ColHis>() { // from class: net.wicp.tams.common.binlog.alone.parser.BaseLogFetcher.2
            @Override // java.util.Comparator
            public int compare(ListenerConf.ColHis colHis, ListenerConf.ColHis colHis2) {
                long time = colHis2.getTime() - colHis.getTime();
                if (time > 0) {
                    return 1;
                }
                return time < 0 ? -1 : 0;
            }
        });
        this.colsMap.put(lowerCase, findColsList);
        ListenerConf.ColHis selFitColName2 = selFitColName(j, findColsList);
        if (selFitColName2 == null) {
            throw new RuntimeException("没有可用的col信息。");
        }
        return selFitColName2;
    }

    private ListenerConf.ColHis selFitColName(long j, List<ListenerConf.ColHis> list) {
        for (int i = 0; i < list.size(); i++) {
            ListenerConf.ColHis colHis = list.get(i);
            if (colHis.getTime() < j) {
                return colHis;
            }
        }
        return null;
    }

    private void initData(Protobuf3.DuckulaEvent.Builder builder, List<String[]> list, List<String> list2, int i) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        String[] strArr = list.get(i);
        for (int i2 = 0; i2 < strArr.length; i2++) {
            if (strArr[i2] != null) {
                builder.putAfter(list2.get(i2), strArr[i2]);
            }
        }
    }
}
