package qunar.tc.qmq.consumer.pull;

import com.google.common.base.Strings;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.base.BaseMessage;
import qunar.tc.qmq.broker.BrokerGroupInfo;
import qunar.tc.qmq.broker.BrokerService;
import qunar.tc.qmq.common.ClientType;
import qunar.tc.qmq.config.PullSubjectsConfig;
import qunar.tc.qmq.consumer.pull.PullParam;
import qunar.tc.qmq.metrics.Metrics;
import qunar.tc.qmq.metrics.MetricsConstants;
import qunar.tc.qmq.metrics.QmqCounter;
import qunar.tc.qmq.utils.RetrySubjectUtils;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:qunar/tc/qmq/consumer/pull/AbstractPullEntry.class */
public abstract class AbstractPullEntry {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractPullEntry.class);
    private static final int MAX_MESSAGE_RETRY_THRESHOLD = 5;
    private final PullService pullService;
    final BrokerService brokerService;
    final AckService ackService;
    private final AtomicReference<Integer> pullRequestTimeout;
    final WeightLoadBalance loadBalance = new WeightLoadBalance();
    private final QmqCounter pullWorkCounter;
    private final QmqCounter pullFailCounter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:qunar/tc/qmq/consumer/pull/AbstractPullEntry$PulledMessageFilterImpl.class */
    public static final class PulledMessageFilterImpl implements PulledMessageFilter {
        private final PullParam pullParam;

        PulledMessageFilterImpl(PullParam pullParam) {
            this.pullParam = pullParam;
        }

        @Override // qunar.tc.qmq.consumer.pull.PulledMessageFilter
        public boolean filter(PulledMessage pulledMessage) {
            if ((this.pullParam.isConsumeMostOnce() && pulledMessage.times() > 1) || pulledMessage.getBooleanProperty(BaseMessage.keys.qmq_corruptData.name())) {
                return false;
            }
            String stringProperty = pulledMessage.getStringProperty(BaseMessage.keys.qmq_consumerGroupName);
            return Strings.isNullOrEmpty(stringProperty) || stringProperty.equals(this.pullParam.getGroup());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractPullEntry(String str, String str2, PullService pullService, AckService ackService, BrokerService brokerService) {
        this.pullService = pullService;
        this.ackService = ackService;
        this.brokerService = brokerService;
        this.pullRequestTimeout = PullSubjectsConfig.get().getPullRequestTimeout(str);
        String[] strArr = {str, str2};
        this.pullWorkCounter = Metrics.counter("qmq_pull_work_count", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr);
        this.pullFailCounter = Metrics.counter("qmq_pull_fail_count", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<PulledMessage> pull(ConsumeParam consumeParam, BrokerGroupInfo brokerGroupInfo, int i, int i2, AckHook ackHook) {
        this.pullWorkCounter.inc();
        PullParam buildPullParam = buildPullParam(consumeParam, brokerGroupInfo, this.ackService.getAckSendInfo(brokerGroupInfo, consumeParam.getSubject(), consumeParam.getGroup()), i, i2);
        try {
            List<PulledMessage> handlePullResult = handlePullResult(buildPullParam, this.pullService.pull(buildPullParam), ackHook);
            brokerGroupInfo.markSuccess();
            recordPullSize(brokerGroupInfo, handlePullResult, i);
            return handlePullResult;
        } catch (ExecutionException e) {
            markFailed(brokerGroupInfo);
            if (e.getCause() instanceof TimeoutException) {
                return Collections.emptyList();
            }
            LOGGER.error("pull message exception. {}", buildPullParam, e);
            return Collections.emptyList();
        } catch (Exception e2) {
            markFailed(brokerGroupInfo);
            LOGGER.error("pull message exception. {}", buildPullParam, e2);
            return Collections.emptyList();
        }
    }

    private void markFailed(BrokerGroupInfo brokerGroupInfo) {
        this.pullFailCounter.inc();
        brokerGroupInfo.markFailed();
        this.loadBalance.timeout(brokerGroupInfo);
    }

    private void recordPullSize(BrokerGroupInfo brokerGroupInfo, List<PulledMessage> list, int i) {
        if (list.size() == 0) {
            this.loadBalance.noMessage(brokerGroupInfo);
        } else if (list.size() >= i) {
            this.loadBalance.fetchedEnoughMessages(brokerGroupInfo);
        } else {
            this.loadBalance.fetchedMessages(brokerGroupInfo);
        }
    }

    private PullParam buildPullParam(ConsumeParam consumeParam, BrokerGroupInfo brokerGroupInfo, AckSendInfo ackSendInfo, int i, int i2) {
        return new PullParam.PullParamBuilder().setConsumeParam(consumeParam).setBrokerGroup(brokerGroupInfo).setPullBatchSize(i).setTimeoutMillis(i2).setRequestTimeoutMillis(this.pullRequestTimeout.get().intValue()).setMinPullOffset(ackSendInfo.getMinPullOffset()).setMaxPullOffset(ackSendInfo.getMaxPullOffset()).create();
    }

    private List<PulledMessage> handlePullResult(PullParam pullParam, PullResult pullResult, AckHook ackHook) {
        if (pullResult.getResponseCode() == 52) {
            pullResult.getBrokerGroup().setAvailable(false);
            this.brokerService.refresh(ClientType.CONSUMER, pullParam.getSubject(), pullParam.getGroup());
        }
        List<BaseMessage> messages = pullResult.getMessages();
        if (messages == null || messages.isEmpty()) {
            return Collections.emptyList();
        }
        monitorMessageCount(pullParam, pullResult);
        List<PulledMessage> buildPulledMessages = this.ackService.buildPulledMessages(pullParam, pullResult, ackHook, new PulledMessageFilterImpl(pullParam));
        if (buildPulledMessages == null || buildPulledMessages.isEmpty()) {
            return Collections.emptyList();
        }
        logTimes(buildPulledMessages);
        return buildPulledMessages;
    }

    private void logTimes(List<PulledMessage> list) {
        for (PulledMessage pulledMessage : list) {
            int times = pulledMessage.times();
            if (times > 5) {
                LOGGER.warn("这是第 {} 次收到同一条消息，请注意检查逻辑是否有问题. subject={}, msgId={}", new Object[]{Integer.valueOf(times), RetrySubjectUtils.getRealSubject(pulledMessage.getSubject()), pulledMessage.getMessageId()});
            }
        }
    }

    private static void monitorMessageCount(PullParam pullParam, PullResult pullResult) {
        try {
            Metrics.counter("qmq_pull_message_count", new String[]{"subject", "group", "broker"}, new String[]{pullParam.getSubject(), pullParam.getGroup(), pullParam.getBrokerGroup().getGroupName()}).inc(pullResult.getMessages().size());
        } catch (Exception e) {
            LOGGER.error("AbstractPullEntry monitor exception", e);
        }
    }
}
