package com.zhidian.cloud.canal.canal;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.zhidian.cloud.canal.model.Packet;
import com.zhidian.cloud.canal.model.Pipeline;
import com.zhidian.cloud.canal.model.Sink;
import com.zhidian.cloud.canal.storage.SinkStorage;
import com.zhidian.cloud.common.core.mq.MqV2Service;
import com.zhidian.cloud.common.logger.Logger;
import com.zhidian.cloud.common.utils.common.JsonResult;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.StringUtils;

/* loaded from: input_file:BOOT-INF/classes/com/zhidian/cloud/canal/canal/SimpleCanalSinker.class */
public class SimpleCanalSinker {
    List<Sink> sinks;
    Session session;
    SinkStorage sinkStorage;
    MqV2Service mqV2Service;
    Logger logger = Logger.getLogger(getClass());
    Map<String, AtomicInteger> sinkReconnectCounters = new HashMap();
    Map<String, Connection> rabbitConnections = new HashMap();
    Map<String, javax.jms.Connection> activeConnections = new HashMap();
    Map<String, Set<String>> middleQueues = new HashMap();
    Map<String, Channel> channelMap = new HashMap();
    Map<String, String> queueMap = new HashMap();
    ObjectMapper objectMapper = new ObjectMapper();

    public void init() {
        loadSink();
    }

    private void loadSink() {
        this.sinks = this.sinkStorage.list();
    }

    public void refreshSinks() {
        synchronized (this.sinks) {
            this.sinks = this.sinkStorage.list();
            this.sinkReconnectCounters = new HashMap();
            this.rabbitConnections = new HashMap();
            this.activeConnections = new HashMap();
            this.middleQueues = new HashMap();
        }
    }

    public void sink(Packet<?> packet, Pipeline pipeline) {
        for (Sink sink : this.sinks) {
            if (sink.getEnabled().booleanValue() && Objects.equals(sink.getId(), pipeline.getSinkId())) {
                while (Objects.equals(sendMessage(packet, pipeline, sink), false)) {
                    AtomicInteger sinkReconnectCounter = getSinkReconnectCounter(sink.getId());
                    sinkReconnectCounter.incrementAndGet();
                    if (sink.getReconnectCount().intValue() == -1 || sink.getReconnectCount().intValue() >= sinkReconnectCounter.get()) {
                        this.logger.warn(String.format("%s[%s] 发送失败，%s 毫秒后重新发送", sink.getType(), sink.getUrl(), sink.getReconnectInterval()));
                        sleepx(sink.getReconnectInterval().longValue());
                    } else {
                        this.logger.error(String.format("%s[%s] 不可用，停止发送，请手动重启", sink.getType(), sink.getUrl()));
                        while (sink.getReconnectCount().intValue() < sinkReconnectCounter.get()) {
                            sleepx(600000L);
                        }
                    }
                }
            }
        }
    }

    public AtomicInteger getSinkReconnectCounter(String str) {
        AtomicInteger atomicInteger = this.sinkReconnectCounters.get(str);
        if (atomicInteger == null) {
            synchronized (this.sinkReconnectCounters) {
                atomicInteger = this.sinkReconnectCounters.get(str);
                if (atomicInteger == null) {
                    atomicInteger = new AtomicInteger(0);
                    this.sinkReconnectCounters.put(str, atomicInteger);
                }
            }
        }
        return atomicInteger;
    }

    private Boolean sendMessage(Packet<?> packet, Pipeline pipeline, Sink sink) {
        if (Objects.equals(sink.getType(), Sink.TYPE_RABBIT)) {
            return Boolean.valueOf(sendToRabbit(packet, pipeline, sink));
        }
        if (Objects.equals(sink.getType(), Sink.TYPE_ACTIVE)) {
            return Boolean.valueOf(sendToActive(packet, pipeline, sink));
        }
        if (Objects.equals(sink.getType(), Sink.TYPE_MIDDLE)) {
            return Boolean.valueOf(sendToMiddle(packet, pipeline, sink));
        }
        return null;
    }

    private boolean sendToRabbit(Packet<?> packet, Pipeline pipeline, Sink sink) {
        try {
            Channel channel = getChannel("test", getRabbitConnection(sink));
            String writeValueAsString = this.objectMapper.writeValueAsString(packet);
            AMQP.BasicProperties build = new AMQP.BasicProperties().builder().contentType("application/json").deliveryMode(2).build();
            if (pipeline.getQueue() != "" && !pipeline.getQueue().isEmpty()) {
                getQueueName(pipeline.getQueue(), channel);
                channel.basicPublish("", pipeline.getQueue(), build, writeValueAsString.getBytes("UTF-8"));
                return true;
            }
            if (pipeline.getTopic() == "" || pipeline.getTopic().isEmpty() || !pipeline.getQueue().isEmpty() || !pipeline.getRoutingKey().isEmpty()) {
                channel.exchangeDeclare(pipeline.getTopic(), "topic");
                channel.basicPublish(pipeline.getTopic(), pipeline.getRoutingKey(), build, writeValueAsString.getBytes("UTF-8"));
                return true;
            }
            System.out.println("即将把消息发送到Exchange为" + pipeline.getTopic() + "上面");
            channel.exchangeDeclare(pipeline.getTopic(), "fanout", true);
            channel.basicPublish(pipeline.getTopic(), "", build, writeValueAsString.getBytes("UTF-8"));
            return true;
        } catch (Exception e) {
            this.logger.warn(String.format("%s[%s] 连接失败", "", sink.getUrl()), e);
            this.rabbitConnections.remove(sink.getId());
            return false;
        }
    }

    private void getQueueName(String str, Channel channel) {
        String str2 = this.queueMap.get(str);
        if (str2 == null || str2 == "") {
            synchronized (this.queueMap) {
                String str3 = this.queueMap.get(str);
                if (str3 == null || str3 == "") {
                    try {
                        channel.queueDeclare(str, true, false, false, null);
                        this.queueMap.put(str, str);
                    } catch (IOException e) {
                        System.out.println(e.getMessage());
                    }
                }
            }
        }
    }

    private Connection getRabbitConnection(Sink sink) {
        Connection connection = this.rabbitConnections.get(sink.getId());
        if (connection == null) {
            synchronized (this.rabbitConnections) {
                connection = this.rabbitConnections.get(sink.getId());
                if (connection == null) {
                    connection = createRabbitConnection(sink);
                    this.rabbitConnections.put(sink.getId(), connection);
                }
            }
        }
        return connection;
    }

    private Connection createRabbitConnection(Sink sink) {
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUsername(sink.getUsername());
            connectionFactory.setPassword(sink.getPassword());
            connectionFactory.setHost(sink.getUrl());
            connectionFactory.setVirtualHost(sink.getVirtualHost());
            return connectionFactory.newConnection();
        } catch (Exception e) {
            throw new RuntimeException(String.format("%s[%s] 连接失败", sink.getType(), sink.getUrl()), e);
        }
    }

    private Channel getChannel(String str, Connection connection) {
        Channel channel = this.channelMap.get(str);
        if (channel == null) {
            synchronized (this.channelMap) {
                channel = this.channelMap.get(str);
                if (channel == null) {
                    try {
                        channel = connection.createChannel();
                        this.channelMap.put(str, channel);
                    } catch (IOException e) {
                        System.out.println(e.getMessage());
                    }
                }
            }
        }
        return channel;
    }

    private boolean sendToActive(Packet<?> packet, Pipeline pipeline, Sink sink) {
        try {
            javax.jms.Connection activeConnection = getActiveConnection(sink);
            if (this.session == null) {
                this.session = activeConnection.createSession(false, 1);
            }
            MessageProducer createProducer = this.session.createProducer(this.session.createQueue(pipeline.getQueue()));
            createProducer.setDeliveryMode(2);
            createProducer.send(this.session.createTextMessage(this.objectMapper.writeValueAsString(packet)));
            return true;
        } catch (Exception e) {
            try {
                this.session.close();
            } catch (Exception e2) {
                this.logger.warn(String.format("%s[%s] session 关闭异常", sink.getType(), sink.getUrl()), e);
            }
            this.logger.warn(String.format("%s[%s] 连接失败", sink.getType(), sink.getUrl()), e);
            this.rabbitConnections.remove(sink.getId());
            return false;
        }
    }

    private void createProducer(Pipeline pipeline) {
    }

    private javax.jms.Connection getActiveConnection(Sink sink) {
        javax.jms.Connection connection = this.activeConnections.get(sink.getId());
        if (connection == null) {
            synchronized (this.activeConnections) {
                connection = this.activeConnections.get(sink.getId());
                if (connection == null) {
                    connection = createActiveConnection(sink);
                    this.activeConnections.put(sink.getId(), connection);
                }
            }
        }
        return connection;
    }

    private javax.jms.Connection createActiveConnection(Sink sink) {
        try {
            javax.jms.Connection createConnection = new ActiveMQConnectionFactory(sink.getUrl()).createConnection();
            createConnection.start();
            return createConnection;
        } catch (Exception e) {
            throw new RuntimeException(String.format("%s[%s] 连接失败", sink.getType(), sink.getUrl()), e);
        }
    }

    private boolean sendToMiddle(Packet<?> packet, Pipeline pipeline, Sink sink) {
        try {
            JsonResult sendToQueue = this.mqV2Service.sendToQueue(getMiddleQueue(pipeline, sink), this.objectMapper.writeValueAsString(packet));
            if (sendToQueue == null) {
                throw new RuntimeException("服务不可用");
            }
            if (Objects.equals(JsonResult.ERR, sendToQueue.getResult())) {
                throw new RuntimeException(sendToQueue.getDesc());
            }
            return true;
        } catch (Exception e) {
            this.logger.warn(String.format("%s[%s] 请求失败", sink.getType(), sink.getUrl()), e);
            return false;
        }
    }

    private String getMiddleQueue(Pipeline pipeline, Sink sink) {
        Set<String> set = this.middleQueues.get(sink.getId());
        if (set == null) {
            synchronized (this.middleQueues) {
                set = this.middleQueues.get(sink.getId());
                if (set == null) {
                    set = new HashSet();
                    this.middleQueues.put(sink.getId(), set);
                }
            }
        }
        String queue = pipeline.getQueue();
        if (!set.contains(queue)) {
            synchronized (set) {
                if (!set.contains(queue)) {
                    queue = createMiddleQueue(pipeline, sink);
                    set.add(queue);
                }
            }
        }
        return queue;
    }

    private String createMiddleQueue(Pipeline pipeline, Sink sink) {
        return pipeline.getQueue();
    }

    private boolean sleepx(long j) {
        try {
            Thread.sleep(j);
            return true;
        } catch (Exception e) {
            this.logger.warn((String) StringUtils.defaultIfBlank(e.getMessage(), "休眠被迫中止"), e);
            return false;
        }
    }

    public void setSinkStorage(SinkStorage sinkStorage) {
        this.sinkStorage = sinkStorage;
    }

    public void setMqV2Service(MqV2Service mqV2Service) {
        this.mqV2Service = mqV2Service;
    }
}
