package com.alibaba.otter.canal.parse.inbound;

import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
import com.alibaba.otter.canal.common.alarm.CanalAlarmHandler;
import com.alibaba.otter.canal.filter.CanalEventFilter;
import com.alibaba.otter.canal.parse.CanalEventParser;
import com.alibaba.otter.canal.parse.exception.CanalParseException;
import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer;
import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
import com.alibaba.otter.canal.protocol.position.LogIdentity;
import com.alibaba.otter.canal.protocol.position.LogPosition;
import com.alibaba.otter.canal.sink.CanalEventSink;
import com.alibaba.otter.canal.sink.exception.CanalSinkException;
import java.io.IOException;
import java.lang.Thread;
import java.nio.channels.ClosedByInterruptException;
import java.util.Arrays;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang.math.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:com/alibaba/otter/canal/parse/inbound/AbstractEventParser.class */
public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle implements CanalEventParser<EVENT> {
    protected volatile AuthenticationInfo runningInfo;
    protected String destination;
    protected volatile Timer timer;
    protected TimerTask heartBeatTimerTask;
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    protected CanalLogPositionManager logPositionManager = null;
    protected CanalEventSink<List<CanalEntry.Entry>> eventSink = null;
    protected CanalEventFilter eventFilter = null;
    protected CanalEventFilter eventBlackFilter = null;
    private CanalAlarmHandler alarmHandler = null;
    protected AtomicBoolean profilingEnabled = new AtomicBoolean(false);
    protected AtomicLong receivedEventCount = new AtomicLong();
    protected AtomicLong parsedEventCount = new AtomicLong();
    protected AtomicLong consumedEventCount = new AtomicLong();
    protected long parsingInterval = -1;
    protected long processingInterval = -1;
    protected BinlogParser binlogParser = null;
    protected Thread parseThread = null;
    protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() { // from class: com.alibaba.otter.canal.parse.inbound.AbstractEventParser.1
        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            AbstractEventParser.this.logger.error("parse events has an error", th);
        }
    };
    protected int transactionSize = 1024;
    protected AtomicBoolean needTransactionPosition = new AtomicBoolean(false);
    protected long lastEntryTime = 0;
    protected volatile boolean detectingEnable = true;
    protected Integer detectingIntervalInSeconds = 3;
    protected Throwable exception = null;
    protected EventTransactionBuffer transactionBuffer = new EventTransactionBuffer(new EventTransactionBuffer.TransactionFlushCallback() { // from class: com.alibaba.otter.canal.parse.inbound.AbstractEventParser.2
        @Override // com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer.TransactionFlushCallback
        public void flush(List<CanalEntry.Entry> list) throws InterruptedException {
            boolean consumeTheEventAndProfilingIfNecessary = AbstractEventParser.this.consumeTheEventAndProfilingIfNecessary(list);
            if (AbstractEventParser.this.running) {
                if (!consumeTheEventAndProfilingIfNecessary) {
                    throw new CanalParseException("consume failed!");
                }
                LogPosition buildLastTransactionPosition = AbstractEventParser.this.buildLastTransactionPosition(list);
                if (buildLastTransactionPosition != null) {
                    AbstractEventParser.this.logPositionManager.persistLogPosition(AbstractEventParser.this.destination, buildLastTransactionPosition);
                }
            }
        }
    });

    protected abstract BinlogParser buildParser();

    protected abstract ErosaConnection buildErosaConnection();

    protected abstract EntryPosition findStartPosition(ErosaConnection erosaConnection) throws IOException;

    protected void preDump(ErosaConnection erosaConnection) {
    }

    protected boolean processTableMeta(EntryPosition entryPosition) {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void afterDump(ErosaConnection erosaConnection) {
    }

    public void sendAlarm(String str, String str2) {
        if (this.alarmHandler != null) {
            this.alarmHandler.sendAlarm(str, str2);
        }
    }

    public void start() {
        super.start();
        MDC.put("destination", this.destination);
        this.transactionBuffer.setBufferSize(this.transactionSize);
        this.transactionBuffer.start();
        this.binlogParser = buildParser();
        this.binlogParser.start();
        this.parseThread = new Thread(new Runnable() { // from class: com.alibaba.otter.canal.parse.inbound.AbstractEventParser.3
            @Override // java.lang.Runnable
            public void run() {
                final EntryPosition findStartPosition;
                MDC.put("destination", String.valueOf(AbstractEventParser.this.destination));
                ErosaConnection erosaConnection = null;
                while (AbstractEventParser.this.running) {
                    try {
                        try {
                            erosaConnection = AbstractEventParser.this.buildErosaConnection();
                            AbstractEventParser.this.startHeartBeat(erosaConnection);
                            AbstractEventParser.this.preDump(erosaConnection);
                            erosaConnection.connect();
                            findStartPosition = AbstractEventParser.this.findStartPosition(erosaConnection);
                        } catch (Throwable th) {
                            Thread.interrupted();
                            AbstractEventParser.this.afterDump(erosaConnection);
                            if (erosaConnection != null) {
                                try {
                                    erosaConnection.disconnect();
                                } catch (IOException e) {
                                    if (!AbstractEventParser.this.running) {
                                        throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ", AbstractEventParser.this.runningInfo.getAddress().toString()), e);
                                    }
                                    AbstractEventParser.this.logger.error("disconnect address {} has an error, retrying., caused by ", AbstractEventParser.this.runningInfo.getAddress().toString(), e);
                                    throw th;
                                }
                            }
                            throw th;
                        }
                    } catch (TableIdNotFoundException e2) {
                        AbstractEventParser.this.exception = e2;
                        AbstractEventParser.this.needTransactionPosition.compareAndSet(false, true);
                        AbstractEventParser.this.logger.error(String.format("dump address %s has an error, retrying. caused by ", AbstractEventParser.this.runningInfo.getAddress().toString()), e2);
                        Thread.interrupted();
                        AbstractEventParser.this.afterDump(erosaConnection);
                        if (erosaConnection != null) {
                            try {
                                erosaConnection.disconnect();
                            } catch (IOException e3) {
                                if (!AbstractEventParser.this.running) {
                                    throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ", AbstractEventParser.this.runningInfo.getAddress().toString()), e3);
                                }
                                AbstractEventParser.this.logger.error("disconnect address {} has an error, retrying., caused by ", AbstractEventParser.this.runningInfo.getAddress().toString(), e3);
                            }
                        }
                    } catch (Throwable th2) {
                        AbstractEventParser.this.processDumpError(th2);
                        AbstractEventParser.this.exception = th2;
                        if (AbstractEventParser.this.running) {
                            AbstractEventParser.this.logger.error(String.format("dump address %s has an error, retrying. caused by ", AbstractEventParser.this.runningInfo.getAddress().toString()), th2);
                            AbstractEventParser.this.sendAlarm(AbstractEventParser.this.destination, ExceptionUtils.getFullStackTrace(th2));
                        } else if (!(th2 instanceof ClosedByInterruptException) && !(th2.getCause() instanceof ClosedByInterruptException)) {
                            throw new CanalParseException(String.format("dump address %s has an error, retrying. ", AbstractEventParser.this.runningInfo.getAddress().toString()), th2);
                        }
                        Thread.interrupted();
                        AbstractEventParser.this.afterDump(erosaConnection);
                        if (erosaConnection != null) {
                            try {
                                erosaConnection.disconnect();
                            } catch (IOException e4) {
                                if (!AbstractEventParser.this.running) {
                                    throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ", AbstractEventParser.this.runningInfo.getAddress().toString()), e4);
                                }
                                AbstractEventParser.this.logger.error("disconnect address {} has an error, retrying., caused by ", AbstractEventParser.this.runningInfo.getAddress().toString(), e4);
                            }
                        }
                    }
                    if (findStartPosition == null) {
                        throw new CanalParseException("can't find start position for " + AbstractEventParser.this.destination);
                    }
                    if (!AbstractEventParser.this.processTableMeta(findStartPosition)) {
                        throw new CanalParseException("can't find init table meta for " + AbstractEventParser.this.destination + " with position : " + findStartPosition);
                    }
                    AbstractEventParser.this.logger.info("find start position : {}", findStartPosition.toString());
                    erosaConnection.reconnect();
                    SinkFunction<EVENT> sinkFunction = new SinkFunction<EVENT>() { // from class: com.alibaba.otter.canal.parse.inbound.AbstractEventParser.3.1
                        private LogPosition lastPosition;

                        @Override // com.alibaba.otter.canal.parse.inbound.SinkFunction
                        public boolean sink(EVENT event) {
                            try {
                                CanalEntry.Entry parseAndProfilingIfNecessary = AbstractEventParser.this.parseAndProfilingIfNecessary(event, false);
                                if (!AbstractEventParser.this.running) {
                                    return false;
                                }
                                if (parseAndProfilingIfNecessary != null) {
                                    AbstractEventParser.this.exception = null;
                                    AbstractEventParser.this.transactionBuffer.add(parseAndProfilingIfNecessary);
                                    this.lastPosition = AbstractEventParser.this.buildLastPosition(parseAndProfilingIfNecessary);
                                    AbstractEventParser.this.lastEntryTime = System.currentTimeMillis();
                                }
                                return AbstractEventParser.this.running;
                            } catch (TableIdNotFoundException e5) {
                                throw e5;
                            } catch (Throwable th3) {
                                if (th3.getCause() instanceof TableIdNotFoundException) {
                                    throw ((TableIdNotFoundException) th3.getCause());
                                }
                                AbstractEventParser.this.processSinkError(th3, this.lastPosition, findStartPosition.getJournalName(), findStartPosition.getPosition().longValue());
                                throw new CanalParseException(th3);
                            }
                        }
                    };
                    if (!StringUtils.isEmpty(findStartPosition.getJournalName()) || findStartPosition.getTimestamp() == null) {
                        erosaConnection.dump(findStartPosition.getJournalName(), findStartPosition.getPosition(), sinkFunction);
                    } else {
                        erosaConnection.dump(findStartPosition.getTimestamp().longValue(), sinkFunction);
                    }
                    Thread.interrupted();
                    AbstractEventParser.this.afterDump(erosaConnection);
                    if (erosaConnection != null) {
                        try {
                            erosaConnection.disconnect();
                        } catch (IOException e5) {
                            if (!AbstractEventParser.this.running) {
                                throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ", AbstractEventParser.this.runningInfo.getAddress().toString()), e5);
                            }
                            AbstractEventParser.this.logger.error("disconnect address {} has an error, retrying., caused by ", AbstractEventParser.this.runningInfo.getAddress().toString(), e5);
                        }
                    }
                    AbstractEventParser.this.eventSink.interrupt();
                    AbstractEventParser.this.transactionBuffer.reset();
                    AbstractEventParser.this.binlogParser.reset();
                    if (AbstractEventParser.this.running) {
                        try {
                            Thread.sleep(10000 + RandomUtils.nextInt(10000));
                        } catch (InterruptedException e6) {
                        }
                    }
                }
                MDC.remove("destination");
            }
        });
        this.parseThread.setUncaughtExceptionHandler(this.handler);
        Thread thread = this.parseThread;
        Object[] objArr = new Object[2];
        objArr[0] = this.destination;
        objArr[1] = this.runningInfo == null ? null : this.runningInfo.getAddress();
        thread.setName(String.format("destination = %s , address = %s , EventParser", objArr));
        this.parseThread.start();
    }

    public void stop() {
        super.stop();
        stopHeartBeat();
        this.parseThread.interrupt();
        this.eventSink.interrupt();
        try {
            this.parseThread.join();
        } catch (InterruptedException e) {
        }
        if (this.binlogParser.isStart()) {
            this.binlogParser.stop();
        }
        if (this.transactionBuffer.isStart()) {
            this.transactionBuffer.stop();
        }
    }

    protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> list) throws CanalSinkException, InterruptedException {
        long j = -1;
        boolean booleanValue = getProfilingEnabled().booleanValue();
        if (booleanValue) {
            j = System.currentTimeMillis();
        }
        boolean sink = this.eventSink.sink(list, this.runningInfo == null ? null : this.runningInfo.getAddress(), this.destination);
        if (booleanValue) {
            this.processingInterval = System.currentTimeMillis() - j;
        }
        if (this.consumedEventCount.incrementAndGet() < 0) {
            this.consumedEventCount.set(0L);
        }
        return sink;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CanalEntry.Entry parseAndProfilingIfNecessary(EVENT event, boolean z) throws Exception {
        long j = -1;
        boolean booleanValue = getProfilingEnabled().booleanValue();
        if (booleanValue) {
            j = System.currentTimeMillis();
        }
        CanalEntry.Entry parse = this.binlogParser.parse(event, z);
        if (booleanValue) {
            this.parsingInterval = System.currentTimeMillis() - j;
        }
        if (this.parsedEventCount.incrementAndGet() < 0) {
            this.parsedEventCount.set(0L);
        }
        return parse;
    }

    public Boolean getProfilingEnabled() {
        return Boolean.valueOf(this.profilingEnabled.get());
    }

    protected LogPosition buildLastTransactionPosition(List<CanalEntry.Entry> list) {
        for (int size = list.size() - 1; size > 0; size--) {
            CanalEntry.Entry entry = list.get(size);
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                return buildLastPosition(entry);
            }
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public LogPosition buildLastPosition(CanalEntry.Entry entry) {
        LogPosition logPosition = new LogPosition();
        EntryPosition entryPosition = new EntryPosition();
        entryPosition.setJournalName(entry.getHeader().getLogfileName());
        entryPosition.setPosition(Long.valueOf(entry.getHeader().getLogfileOffset()));
        entryPosition.setTimestamp(Long.valueOf(entry.getHeader().getExecuteTime()));
        entryPosition.setServerId(Long.valueOf(entry.getHeader().getServerId()));
        logPosition.setPostion(entryPosition);
        logPosition.setIdentity(new LogIdentity(this.runningInfo.getAddress(), -1L));
        return logPosition;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processSinkError(Throwable th, LogPosition logPosition, String str, long j) {
        if (logPosition != null) {
            this.logger.warn(String.format("ERROR ## parse this event has an error , last position : [%s]", logPosition.getPostion()), th);
        } else {
            this.logger.warn(String.format("ERROR ## parse this event has an error , last position : [%s,%s]", str, Long.valueOf(j)), th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processDumpError(Throwable th) {
    }

    protected void startHeartBeat(ErosaConnection erosaConnection) {
        this.lastEntryTime = 0L;
        if (this.timer == null) {
            Object[] objArr = new Object[2];
            objArr[0] = this.destination;
            objArr[1] = this.runningInfo == null ? null : this.runningInfo.getAddress().toString();
            String format = String.format("destination = %s , address = %s , HeartBeatTimeTask", objArr);
            synchronized (AbstractEventParser.class) {
                if (this.timer == null) {
                    this.timer = new Timer(format, true);
                }
            }
        }
        if (this.heartBeatTimerTask == null) {
            this.heartBeatTimerTask = buildHeartBeatTimeTask(erosaConnection);
            Integer num = this.detectingIntervalInSeconds;
            this.timer.schedule(this.heartBeatTimerTask, num.intValue() * 1000, num.intValue() * 1000);
            this.logger.info("start heart beat.... ");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TimerTask buildHeartBeatTimeTask(ErosaConnection erosaConnection) {
        return new TimerTask() { // from class: com.alibaba.otter.canal.parse.inbound.AbstractEventParser.4
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    if (AbstractEventParser.this.exception == null || AbstractEventParser.this.lastEntryTime > 0) {
                        long currentTimeMillis = System.currentTimeMillis();
                        if ((currentTimeMillis - AbstractEventParser.this.lastEntryTime) / 1000 >= AbstractEventParser.this.detectingIntervalInSeconds.intValue()) {
                            CanalEntry.Header.Builder newBuilder = CanalEntry.Header.newBuilder();
                            newBuilder.setExecuteTime(currentTimeMillis);
                            CanalEntry.Entry.Builder newBuilder2 = CanalEntry.Entry.newBuilder();
                            newBuilder2.setHeader(newBuilder.build());
                            newBuilder2.setEntryType(CanalEntry.EntryType.HEARTBEAT);
                            AbstractEventParser.this.consumeTheEventAndProfilingIfNecessary(Arrays.asList(newBuilder2.build()));
                        }
                    }
                } catch (Throwable th) {
                    AbstractEventParser.this.logger.warn("heartBeat run failed ", th);
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stopHeartBeat() {
        this.lastEntryTime = 0L;
        if (this.timer != null) {
            this.timer.cancel();
            this.timer = null;
        }
        this.heartBeatTimerTask = null;
    }

    public void setEventFilter(CanalEventFilter canalEventFilter) {
        this.eventFilter = canalEventFilter;
    }

    public void setEventBlackFilter(CanalEventFilter canalEventFilter) {
        this.eventBlackFilter = canalEventFilter;
    }

    public Long getParsedEventCount() {
        return Long.valueOf(this.parsedEventCount.get());
    }

    public Long getConsumedEventCount() {
        return Long.valueOf(this.consumedEventCount.get());
    }

    public void setProfilingEnabled(boolean z) {
        this.profilingEnabled = new AtomicBoolean(z);
    }

    public long getParsingInterval() {
        return this.parsingInterval;
    }

    public long getProcessingInterval() {
        return this.processingInterval;
    }

    public void setEventSink(CanalEventSink<List<CanalEntry.Entry>> canalEventSink) {
        this.eventSink = canalEventSink;
    }

    public void setDestination(String str) {
        this.destination = str;
    }

    public void setBinlogParser(BinlogParser binlogParser) {
        this.binlogParser = binlogParser;
    }

    public BinlogParser getBinlogParser() {
        return this.binlogParser;
    }

    public void setAlarmHandler(CanalAlarmHandler canalAlarmHandler) {
        this.alarmHandler = canalAlarmHandler;
    }

    public CanalAlarmHandler getAlarmHandler() {
        return this.alarmHandler;
    }

    public void setLogPositionManager(CanalLogPositionManager canalLogPositionManager) {
        this.logPositionManager = canalLogPositionManager;
    }

    public void setTransactionSize(int i) {
        this.transactionSize = i;
    }

    public CanalLogPositionManager getLogPositionManager() {
        return this.logPositionManager;
    }

    public void setDetectingEnable(boolean z) {
        this.detectingEnable = z;
    }

    public void setDetectingIntervalInSeconds(Integer num) {
        this.detectingIntervalInSeconds = num;
    }

    public Throwable getException() {
        return this.exception;
    }
}
