package com.alibaba.otter.canal.client.impl.running;

import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
import com.alibaba.otter.canal.common.utils.BooleanMutex;
import com.alibaba.otter.canal.common.utils.JsonUtils;
import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import java.net.InetSocketAddress;
import java.text.MessageFormat;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:BOOT-INF/lib/canal.client-1.0.25.jar:com/alibaba/otter/canal/client/impl/running/ClientRunningMonitor.class */
public class ClientRunningMonitor extends AbstractCanalLifeCycle {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ClientRunningMonitor.class);
    private ZkClientx zkClient;
    private String destination;
    private ClientRunningData clientData;
    private volatile ClientRunningData activeData;
    private ClientRunningListener listener;
    private BooleanMutex mutex = new BooleanMutex(false);
    private volatile boolean release = false;
    private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
    private int delayTime = 5;
    private IZkDataListener dataListener = new IZkDataListener() { // from class: com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.1
        @Override // org.I0Itec.zkclient.IZkDataListener
        public void handleDataChange(String str, Object obj) throws Exception {
            MDC.put("destination", ClientRunningMonitor.this.destination);
            ClientRunningData clientRunningData = (ClientRunningData) JsonUtils.unmarshalFromByte((byte[]) obj, ClientRunningData.class);
            if (!ClientRunningMonitor.this.isMine(clientRunningData.getAddress())) {
                ClientRunningMonitor.this.mutex.set(false);
            }
            if (!clientRunningData.isActive() && ClientRunningMonitor.this.isMine(clientRunningData.getAddress())) {
                ClientRunningMonitor.this.release = true;
                ClientRunningMonitor.this.releaseRunning();
            }
            ClientRunningMonitor.this.activeData = clientRunningData;
        }

        @Override // org.I0Itec.zkclient.IZkDataListener
        public void handleDataDeleted(String str) throws Exception {
            MDC.put("destination", ClientRunningMonitor.this.destination);
            ClientRunningMonitor.this.mutex.set(false);
            ClientRunningMonitor.this.processActiveExit();
            if (ClientRunningMonitor.this.release || ClientRunningMonitor.this.activeData == null || !ClientRunningMonitor.this.isMine(ClientRunningMonitor.this.activeData.getAddress())) {
                ClientRunningMonitor.this.delayExector.schedule(new Runnable() { // from class: com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        ClientRunningMonitor.this.initRunning();
                    }
                }, ClientRunningMonitor.this.delayTime, TimeUnit.SECONDS);
            } else {
                ClientRunningMonitor.this.initRunning();
            }
        }
    };

    @Override // com.alibaba.otter.canal.common.AbstractCanalLifeCycle, com.alibaba.otter.canal.common.CanalLifeCycle
    public void start() {
        super.start();
        this.zkClient.subscribeDataChanges(ZookeeperPathUtils.getDestinationClientRunning(this.destination, this.clientData.getClientId()), this.dataListener);
        initRunning();
    }

    @Override // com.alibaba.otter.canal.common.AbstractCanalLifeCycle, com.alibaba.otter.canal.common.CanalLifeCycle
    public void stop() {
        super.stop();
        this.zkClient.unsubscribeDataChanges(ZookeeperPathUtils.getDestinationClientRunning(this.destination, this.clientData.getClientId()), this.dataListener);
        releaseRunning();
    }

    public synchronized void initRunning() {
        if (isStart()) {
            String destinationClientRunning = ZookeeperPathUtils.getDestinationClientRunning(this.destination, this.clientData.getClientId());
            byte[] marshalToByte = JsonUtils.marshalToByte(this.clientData);
            try {
                this.mutex.set(false);
                this.zkClient.create(destinationClientRunning, marshalToByte, CreateMode.EPHEMERAL);
                processActiveEnter();
                this.activeData = this.clientData;
                this.mutex.set(true);
            } catch (ZkNoNodeException e) {
                this.zkClient.createPersistent(ZookeeperPathUtils.getClientIdNodePath(this.destination, this.clientData.getClientId()), true);
                initRunning();
            } catch (ZkNodeExistsException e2) {
                byte[] bArr = (byte[]) this.zkClient.readData(destinationClientRunning, true);
                if (bArr == null) {
                    initRunning();
                    return;
                }
                this.activeData = (ClientRunningData) JsonUtils.unmarshalFromByte(bArr, ClientRunningData.class);
                if (this.activeData.getAddress().contains(":") && isMine(this.activeData.getAddress())) {
                    this.mutex.set(true);
                }
            } catch (Throwable th) {
                logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].", this.destination), th);
                releaseRunning();
                throw new CanalClientException("something goes wrong in initRunning method. ", th);
            }
        }
    }

    public void waitForActive() throws InterruptedException {
        initRunning();
        this.mutex.get();
    }

    public boolean check() {
        try {
            this.activeData = (ClientRunningData) JsonUtils.unmarshalFromByte((byte[]) this.zkClient.readData(ZookeeperPathUtils.getDestinationClientRunning(this.destination, this.clientData.getClientId())), ClientRunningData.class);
            boolean isMine = isMine(this.activeData.getAddress());
            if (!isMine) {
                logger.warn("canal is running in [{}] , but not in [{}]", this.activeData.getAddress(), this.clientData.getAddress());
            }
            return isMine;
        } catch (ZkInterruptedException e) {
            logger.warn("canal check is interrupt");
            Thread.interrupted();
            return check();
        } catch (ZkNoNodeException e2) {
            logger.warn("canal is not run any in node");
            return false;
        } catch (ZkException e3) {
            logger.warn("canal check is failed");
            return false;
        }
    }

    public boolean releaseRunning() {
        if (!check()) {
            return false;
        }
        this.zkClient.delete(ZookeeperPathUtils.getDestinationClientRunning(this.destination, this.clientData.getClientId()));
        this.mutex.set(false);
        processActiveExit();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isMine(String str) {
        return str.equals(this.clientData.getAddress());
    }

    private void processActiveEnter() {
        if (this.listener != null) {
            InetSocketAddress processActiveEnter = this.listener.processActiveEnter();
            this.clientData.setAddress(processActiveEnter.getAddress().getHostAddress() + ":" + processActiveEnter.getPort());
            this.zkClient.writeData(ZookeeperPathUtils.getDestinationClientRunning(this.destination, this.clientData.getClientId()), JsonUtils.marshalToByte(this.clientData));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processActiveExit() {
        if (this.listener != null) {
            this.listener.processActiveExit();
        }
    }

    public void setListener(ClientRunningListener clientRunningListener) {
        this.listener = clientRunningListener;
    }

    public void setDestination(String str) {
        this.destination = str;
    }

    public void setClientData(ClientRunningData clientRunningData) {
        this.clientData = clientRunningData;
    }

    public void setDelayTime(int i) {
        this.delayTime = i;
    }

    public void setZkClient(ZkClientx zkClientx) {
        this.zkClient = zkClientx;
    }
}
