package net.wicp.tams.common.flink.source.binlog;

import java.util.ArrayList;
import java.util.List;
import net.wicp.tams.common.binlog.alone.BusiAssit;
import net.wicp.tams.common.binlog.alone.ListenerConf;
import net.wicp.tams.common.binlog.alone.parser.ParseLogOnline;
import net.wicp.tams.duckula.client.Protobuf3;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/wicp/tams/common/flink/source/binlog/BinlogSource.class */
public class BinlogSource extends RichSourceFunction<Protobuf3.DuckulaEvent> implements ListCheckpointed<ListenerConf.CheckPoint> {
    private static final Logger log = LoggerFactory.getLogger(BinlogSource.class);
    private static final long serialVersionUID = 1;
    private ListenerConf.ConnConf connConf;
    private ParseLogOnline logFetcher;
    private static final String listener = "net.wicp.tams.common.flink.source.binlog.FlinkBinlogListener";
    private static final String chk = "net.wicp.tams.common.binlog.alone.checkpoint.CheckPointMemory";
    private List<ListenerConf.ColHis> colsList;

    public BinlogSource(ListenerConf.ConnConf.Builder builder) {
        this.colsList = null;
        builder.setListener(listener);
        builder.setChk(chk);
        log.info("====设置chk:{}", chk);
        this.connConf = builder.build();
    }

    public BinlogSource(String str) {
        this.colsList = null;
        ListenerConf.ConnConf.Builder configMap = BusiAssit.configMap(str);
        configMap.setListener(listener);
        configMap.setChk(chk);
        log.info("====设置chk:{}", chk);
        this.connConf = configMap.build();
    }

    public BinlogSource() {
        this("default");
    }

    public void setRuntimeContext(RuntimeContext runtimeContext) {
        super.setRuntimeContext(runtimeContext);
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        getRuntimeContext().getUserCodeClassLoader().loadClass(listener);
        getRuntimeContext().getUserCodeClassLoader().loadClass(chk);
        this.logFetcher = new ParseLogOnline(this.connConf.toBuilder());
    }

    public List<ListenerConf.CheckPoint> snapshotState(long j, long j2) throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.logFetcher.getCheckPointCur());
        return arrayList;
    }

    public void restoreState(List<ListenerConf.CheckPoint> list) throws Exception {
        ListenerConf.ConnConf.Builder builder = this.connConf.toBuilder();
        ListenerConf.CheckPoint checkPoint = list.get(0);
        ListenerConf.Position pos = checkPoint.getPos();
        log.info("the binlog begin from:{}", pos.getGtids());
        builder.setPos(pos);
        this.connConf = builder.build();
        this.colsList = checkPoint.getColsList();
    }

    public void run(SourceFunction.SourceContext<Protobuf3.DuckulaEvent> sourceContext) throws Exception {
        ((FlinkBinlogListener) this.logFetcher.getBinlogListener()).setCtx(sourceContext);
        this.logFetcher.setColHis(this.colsList);
        this.logFetcher.read();
    }

    public void cancel() {
        if (this.logFetcher != null) {
            log.info("============cancel the logFetcher");
            this.logFetcher.close();
        }
    }
}
