package qunar.tc.qmq.netty.client;

import com.google.common.base.Strings;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.netty.exception.ClientSendException;
import qunar.tc.qmq.util.RemoteHelper;

@ChannelHandler.Sharable
/* loaded from: input_file:qunar/tc/qmq/netty/client/NettyConnectManageHandler.class */
public class NettyConnectManageHandler extends ChannelDuplexHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(NettyConnectManageHandler.class);
    private static final long LOCK_TIMEOUT_MILLIS = 5000;
    private final Bootstrap bootstrap;
    private final long connectTimeout;
    private final ConcurrentMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap();
    private final Lock channelLock = new ReentrantLock();
    private final RateLimiter connectFailLogLimit = RateLimiter.create(0.2d);
    private final RateLimiter closeChannelLogLimit = RateLimiter.create(0.2d);

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyConnectManageHandler(Bootstrap bootstrap, long j) {
        this.bootstrap = bootstrap;
        this.connectTimeout = j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        Iterator<ChannelWrapper> it = this.channelTables.values().iterator();
        while (it.hasNext()) {
            closeChannel(it.next().getChannel());
        }
        this.channelTables.clear();
    }

    private boolean tryLockChannelTable() {
        try {
            if (this.channelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                return true;
            }
            LOGGER.warn("try to lock channel table, but timeout in {}ms.", Long.valueOf(LOCK_TIMEOUT_MILLIS));
            return false;
        } catch (InterruptedException e) {
            LOGGER.warn("try to lock channel table, but be interrupted.");
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Channel getOrCreateChannel(String str) throws ClientSendException {
        if (Strings.isNullOrEmpty(str)) {
            throw new ClientSendException(ClientSendException.SendErrorCode.EMPTY_ADDRESS);
        }
        ChannelWrapper channelWrapper = this.channelTables.get(str);
        if (channelWrapper != null && channelWrapper.isOK()) {
            return channelWrapper.getChannel();
        }
        try {
            if (!tryLockChannelTable()) {
                throw new ClientSendException(ClientSendException.SendErrorCode.CREATE_CHANNEL_FAIL, str);
            }
            try {
                boolean z = true;
                channelWrapper = this.channelTables.get(str);
                if (channelWrapper != null) {
                    if (channelWrapper.isOK()) {
                        Channel channel = channelWrapper.getChannel();
                        this.channelLock.unlock();
                        return channel;
                    }
                    if (channelWrapper.getChannelFuture().isDone()) {
                        this.channelTables.remove(str);
                    } else {
                        z = false;
                    }
                }
                if (z) {
                    ChannelFuture connect = this.bootstrap.connect(RemoteHelper.string2SocketAddress(str));
                    LOGGER.debug("NettyConnectManageHandler", "begin to connect remote host: {}", str);
                    channelWrapper = new ChannelWrapper(connect);
                    this.channelTables.put(str, channelWrapper);
                }
                this.channelLock.unlock();
            } catch (Exception e) {
                LOGGER.error("create channel exception. remoteAddr={}", str, e);
                this.channelLock.unlock();
            }
            if (channelWrapper != null) {
                ChannelFuture channelFuture = channelWrapper.getChannelFuture();
                if (channelFuture.awaitUninterruptibly(this.connectTimeout)) {
                    if (channelWrapper.isOK()) {
                        LOGGER.debug("NettyConnectManageHandler", "connect remote host success: {}", str);
                        return channelWrapper.getChannel();
                    }
                    if (this.connectFailLogLimit.tryAcquire()) {
                        LOGGER.warn("connect remote host fail: {}. {}", new Object[]{str, channelFuture.toString(), channelFuture.cause()});
                    }
                } else if (this.connectFailLogLimit.tryAcquire()) {
                    LOGGER.warn("connect remote host timeout: {}. {}", str, channelFuture.toString());
                }
            }
            throw new ClientSendException(ClientSendException.SendErrorCode.CREATE_CHANNEL_FAIL, str);
        } catch (Throwable th) {
            this.channelLock.unlock();
            throw th;
        }
    }

    public void connect(ChannelHandlerContext channelHandlerContext, SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) throws Exception {
        LOGGER.debug("NettyConnectManageHandler", new Object[]{"NETTY CLIENT PIPELINE: CONNECT {} => {}", socketAddress2 == null ? "UNKNOWN" : RemoteHelper.parseSocketAddressAddress(socketAddress2), socketAddress == null ? "UNKNOWN" : RemoteHelper.parseSocketAddressAddress(socketAddress)});
        super.connect(channelHandlerContext, socketAddress, socketAddress2, channelPromise);
    }

    public void disconnect(ChannelHandlerContext channelHandlerContext, ChannelPromise channelPromise) throws Exception {
        LOGGER.debug("NettyConnectManageHandler", "NETTY CLIENT PIPELINE: DISCONNECT {}", RemoteHelper.parseChannelRemoteAddress(channelHandlerContext.channel()));
        closeChannel(channelHandlerContext.channel());
        super.disconnect(channelHandlerContext, channelPromise);
    }

    public void close(ChannelHandlerContext channelHandlerContext, ChannelPromise channelPromise) throws Exception {
        LOGGER.debug("NettyConnectManageHandler", "NETTY CLIENT PIPELINE: CLOSE {}", RemoteHelper.parseChannelRemoteAddress(channelHandlerContext.channel()));
        closeChannel(channelHandlerContext.channel());
        super.close(channelHandlerContext, channelPromise);
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if ((obj instanceof IdleStateEvent) && ((IdleStateEvent) obj).state().equals(IdleState.ALL_IDLE)) {
            LOGGER.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", RemoteHelper.parseChannelRemoteAddress(channelHandlerContext.channel()));
            closeChannel(channelHandlerContext.channel());
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", RemoteHelper.parseChannelRemoteAddress(channelHandlerContext.channel()));
        LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", th);
        closeChannel(channelHandlerContext.channel());
    }

    private void closeChannel(Channel channel) {
        if (channel == null) {
            return;
        }
        try {
            if (tryLockChannelTable()) {
                try {
                    ChannelWrapper channelWrapper = null;
                    String str = null;
                    Iterator<Map.Entry<String, ChannelWrapper>> it = this.channelTables.entrySet().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        Map.Entry<String, ChannelWrapper> next = it.next();
                        ChannelWrapper value = next.getValue();
                        if (value != null && value.getChannel() == channel) {
                            str = next.getKey();
                            channelWrapper = value;
                            break;
                        }
                    }
                    if (channelWrapper == null) {
                        LOGGER.debug("NettyConnectManageHandler", "close channel but not found in channelTable");
                        this.channelLock.unlock();
                        return;
                    }
                    this.channelTables.remove(str);
                    if (this.closeChannelLogLimit.tryAcquire()) {
                        LOGGER.info("close channel and remove from channelTable. remoteAddr={}", str);
                        RemoteHelper.closeChannel(channel, true);
                    } else {
                        RemoteHelper.closeChannel(channel, false);
                    }
                    this.channelLock.unlock();
                } catch (Exception e) {
                    LOGGER.error("closeChannel: close the channel exception", e);
                    this.channelLock.unlock();
                }
            }
        } catch (Throwable th) {
            this.channelLock.unlock();
            throw th;
        }
    }
}
