package qunar.tc.qmq.consumer.pull;

import com.google.common.base.Supplier;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.base.BaseMessage;
import qunar.tc.qmq.consumer.BaseMessageHandler;
import qunar.tc.qmq.consumer.register.RegistParam;
import qunar.tc.qmq.metrics.Metrics;
import qunar.tc.qmq.metrics.MetricsConstants;
import qunar.tc.qmq.metrics.QmqCounter;
import qunar.tc.qmq.metrics.QmqTimer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:qunar/tc/qmq/consumer/pull/PushConsumerImpl.class */
public class PushConsumerImpl extends BaseMessageHandler implements PushConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumer.class);
    private final ConsumeParam consumeParam;
    private final LinkedBlockingQueue<PulledMessage> messageBuffer;
    private final QmqTimer createToHandleTimer;
    private final QmqTimer handleTimer;
    private final QmqCounter handleFailCounter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:qunar/tc/qmq/consumer/pull/PushConsumerImpl$HandleTaskImpl.class */
    public final class HandleTaskImpl extends BaseMessageHandler.HandleTask {
        private final PulledMessage message;

        HandleTaskImpl(PulledMessage pulledMessage, BaseMessageHandler baseMessageHandler) {
            super(pulledMessage, baseMessageHandler);
            this.message = pulledMessage;
        }

        @Override // qunar.tc.qmq.consumer.BaseMessageHandler.HandleTask, java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            PushConsumerImpl.this.createToHandleTimer.update(currentTimeMillis - this.message.getCreatedTime().getTime(), TimeUnit.MILLISECONDS);
            try {
                super.run();
            } finally {
                PushConsumerImpl.this.handleTimer.update(System.currentTimeMillis() - currentTimeMillis, TimeUnit.MILLISECONDS);
                if (this.handleFail) {
                    PushConsumerImpl.this.handleFailCounter.inc();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PushConsumerImpl(String str, String str2, RegistParam registParam) {
        super(registParam.getExecutor(), registParam.getMessageListener());
        this.messageBuffer = new LinkedBlockingQueue<>();
        this.consumeParam = new ConsumeParam(str, str2, registParam);
        String[] strArr = {str, str2};
        Metrics.gauge("qmq_pull_buffer_size", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr, new Supplier<Double>() { // from class: qunar.tc.qmq.consumer.pull.PushConsumerImpl.1
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Double m47get() {
                return Double.valueOf(PushConsumerImpl.this.messageBuffer.size());
            }
        });
        this.createToHandleTimer = Metrics.timer("qmq_pull_createToHandle_timer", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr);
        this.handleTimer = Metrics.timer("qmq_pull_handle_timer", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr);
        this.handleFailCounter = Metrics.counter("qmq_pull_handleFail_count", MetricsConstants.SUBJECT_GROUP_ARRAY, strArr);
    }

    @Override // qunar.tc.qmq.BaseConsumer
    public String subject() {
        return this.consumeParam.getSubject();
    }

    @Override // qunar.tc.qmq.BaseConsumer
    public String group() {
        return this.consumeParam.getGroup();
    }

    @Override // qunar.tc.qmq.consumer.pull.PushConsumer
    public ConsumeParam consumeParam() {
        return this.consumeParam;
    }

    @Override // qunar.tc.qmq.consumer.pull.PushConsumer
    public boolean cleanLocalBuffer() {
        while (!this.messageBuffer.isEmpty()) {
            if (!push(this.messageBuffer.peek())) {
                return false;
            }
            this.messageBuffer.poll();
        }
        return true;
    }

    @Override // qunar.tc.qmq.consumer.pull.PushConsumer
    public void push(List<PulledMessage> list) {
        for (int i = 0; i < list.size(); i++) {
            if (!push(list.get(i))) {
                this.messageBuffer.addAll(list.subList(i, list.size()));
                return;
            }
        }
    }

    private boolean push(PulledMessage pulledMessage) {
        try {
            this.executor.execute(new HandleTaskImpl(pulledMessage, this));
            LOGGER.info("进入执行队列 {}:{}", pulledMessage.getSubject(), pulledMessage.getMessageId());
            return true;
        } catch (RejectedExecutionException e) {
            LOGGER.error("消息进入执行队列失败，请调整消息处理线程池大小, {}:{}", pulledMessage.getSubject(), pulledMessage.getMessageId());
            return false;
        }
    }

    @Override // qunar.tc.qmq.consumer.BaseMessageHandler
    protected void ack(BaseMessage baseMessage, long j, Throwable th, Map<String, String> map) {
        PulledMessage pulledMessage = (PulledMessage) baseMessage;
        if (pulledMessage.hasNotAcked()) {
            AckHelper.ackWithTrace(pulledMessage, th);
        }
    }

    @Override // qunar.tc.qmq.consumer.pull.AckHook
    public void call(PulledMessage pulledMessage, Throwable th) {
        applyPostOnMessage(pulledMessage, th, new HashMap(pulledMessage.filterContext()));
        AckHelper.ackWithTrace(pulledMessage, th);
    }
}
