package qunar.tc.qmq.netty.client;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.concurrent.NamedThreadFactory;
import qunar.tc.qmq.netty.client.ResponseFuture;
import qunar.tc.qmq.netty.exception.ClientSendException;
import qunar.tc.qmq.protocol.Datagram;
import qunar.tc.qmq.util.RemoteHelper;

/* JADX INFO: Access modifiers changed from: package-private */
@ChannelHandler.Sharable
/* loaded from: input_file:qunar/tc/qmq/netty/client/NettyClientHandler.class */
public class NettyClientHandler extends SimpleChannelInboundHandler<Datagram> {
    private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientHandler.class);
    private static final long CLEAN_RESPONSE_TABLE_PERIOD_MILLIS = 1000;
    private final AtomicInteger opaque = new AtomicInteger(0);
    private final ConcurrentMap<Channel, ConcurrentMap<Integer, ResponseFuture>> requestsInFlight = new ConcurrentHashMap(4);
    private final ScheduledExecutorService timeoutTracker = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("qmq-client-clean"));

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyClientHandler() {
        this.timeoutTracker.scheduleAtFixedRate(new Runnable() { // from class: qunar.tc.qmq.netty.client.NettyClientHandler.1
            @Override // java.lang.Runnable
            public void run() {
                NettyClientHandler.this.processTimeouts();
            }
        }, 3000L, CLEAN_RESPONSE_TABLE_PERIOD_MILLIS, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ResponseFuture newResponse(Channel channel, long j, ResponseFuture.Callback callback) throws ClientSendException {
        int andIncrement = this.opaque.getAndIncrement();
        ResponseFuture responseFuture = new ResponseFuture(andIncrement, j, callback);
        ConcurrentMap<Integer, ResponseFuture> concurrentMap = this.requestsInFlight.get(channel);
        if (concurrentMap == null) {
            concurrentMap = new ConcurrentHashMap();
            ConcurrentMap<Integer, ResponseFuture> putIfAbsent = this.requestsInFlight.putIfAbsent(channel, concurrentMap);
            if (putIfAbsent != null) {
                concurrentMap = putIfAbsent;
            }
        }
        if (concurrentMap.putIfAbsent(Integer.valueOf(andIncrement), responseFuture) != null) {
            throw new ClientSendException(ClientSendException.SendErrorCode.ILLEGAL_OPAQUE);
        }
        return responseFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeResponse(Channel channel, ResponseFuture responseFuture) {
        ConcurrentMap<Integer, ResponseFuture> concurrentMap = this.requestsInFlight.get(channel);
        if (concurrentMap == null) {
            return;
        }
        concurrentMap.remove(Integer.valueOf(responseFuture.getOpaque()), responseFuture);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void channelRead0(ChannelHandlerContext channelHandlerContext, Datagram datagram) {
        if (datagram == null) {
            return;
        }
        try {
            processResponse(channelHandlerContext, datagram);
        } catch (Exception e) {
            LOGGER.error("processResponse exception", e);
        }
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        ConcurrentMap<Integer, ResponseFuture> remove = this.requestsInFlight.remove(channelHandlerContext.channel());
        if (remove == null) {
            return;
        }
        Iterator<Map.Entry<Integer, ResponseFuture>> it = remove.entrySet().iterator();
        while (it.hasNext()) {
            ResponseFuture value = it.next().getValue();
            value.completeByTimeoutClean();
            value.executeCallbackOnlyOnce();
        }
    }

    private void processResponse(ChannelHandlerContext channelHandlerContext, Datagram datagram) {
        int opaque = datagram.getHeader().getOpaque();
        ConcurrentMap<Integer, ResponseFuture> concurrentMap = this.requestsInFlight.get(channelHandlerContext.channel());
        if (concurrentMap == null) {
            return;
        }
        ResponseFuture remove = concurrentMap.remove(Integer.valueOf(opaque));
        if (remove == null) {
            LOGGER.warn("receive response, but not matched any request, maybe response timeout or channel had been closed, {}", RemoteHelper.parseChannelRemoteAddress(channelHandlerContext.channel()));
        } else {
            remove.completeByReceiveResponse(datagram);
            remove.executeCallbackOnlyOnce();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processTimeouts() {
        LinkedList linkedList = new LinkedList();
        Iterator<Map.Entry<Channel, ConcurrentMap<Integer, ResponseFuture>>> it = this.requestsInFlight.entrySet().iterator();
        while (it.hasNext()) {
            Iterator<Map.Entry<Integer, ResponseFuture>> it2 = it.next().getValue().entrySet().iterator();
            while (it2.hasNext()) {
                ResponseFuture value = it2.next().getValue();
                if (isTimeout(value)) {
                    value.completeByTimeoutClean();
                    it2.remove();
                    linkedList.add(value);
                    LOGGER.warn("remove timeout request, " + value);
                }
            }
        }
        executeCallbacks(linkedList);
    }

    private boolean isTimeout(ResponseFuture responseFuture) {
        return responseFuture.getTimeout() >= 0 && responseFuture.getBeginTime() + responseFuture.getTimeout() <= System.currentTimeMillis();
    }

    private void executeCallbacks(List<ResponseFuture> list) {
        Iterator<ResponseFuture> it = list.iterator();
        while (it.hasNext()) {
            try {
                it.next().executeCallbackOnlyOnce();
            } catch (Throwable th) {
                LOGGER.warn("scanResponseTable, operationComplete Exception", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        this.timeoutTracker.shutdown();
    }
}
