package qunar.tc.qmq.producer.sender;

import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.ProduceMessage;
import qunar.tc.qmq.batch.BatchExecutor;
import qunar.tc.qmq.batch.Processor;
import qunar.tc.qmq.metrics.Metrics;
import qunar.tc.qmq.metrics.QmqTimer;
import qunar.tc.qmq.netty.exception.SubjectNotAssignedException;
import qunar.tc.qmq.producer.QueueSender;
import qunar.tc.qmq.producer.SendErrorHandler;
import qunar.tc.qmq.service.exceptions.MessageException;
import qunar.tc.qmq.tracing.TraceUtil;

/* loaded from: input_file:qunar/tc/qmq/producer/sender/RPCQueueSender.class */
class RPCQueueSender implements QueueSender, SendErrorHandler, Processor<ProduceMessage> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RPCQueueSender.class);
    private final BatchExecutor<ProduceMessage> executor;
    private final RouterManager routerManager;
    private final QmqTimer timer = Metrics.timer("qmq_client_send_task_timer");

    public RPCQueueSender(String str, int i, int i2, int i3, RouterManager routerManager) {
        this.routerManager = routerManager;
        this.executor = new BatchExecutor<>(str, i3, this);
        this.executor.setQueueSize(i);
        this.executor.setThreads(i2);
        this.executor.init();
    }

    @Override // qunar.tc.qmq.producer.QueueSender
    public boolean offer(ProduceMessage produceMessage) {
        return this.executor.addItem(produceMessage);
    }

    @Override // qunar.tc.qmq.producer.QueueSender
    public boolean offer(ProduceMessage produceMessage, long j) {
        try {
            return this.executor.addItem(produceMessage, j, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            return false;
        }
    }

    @Override // qunar.tc.qmq.producer.QueueSender
    public void send(ProduceMessage produceMessage) {
        process(Arrays.asList(produceMessage));
    }

    @Override // qunar.tc.qmq.batch.Processor
    public void process(List<ProduceMessage> list) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            Iterator<MessageSenderGroup> it = groupBy(list).iterator();
            while (it.hasNext()) {
                it.next().send();
            }
        } finally {
            this.timer.update(System.currentTimeMillis() - currentTimeMillis, TimeUnit.MILLISECONDS);
        }
    }

    private Collection<MessageSenderGroup> groupBy(List<ProduceMessage> list) {
        HashMap newHashMap = Maps.newHashMap();
        for (int i = 0; i < list.size(); i++) {
            ProduceMessage produceMessage = list.get(i);
            produceMessage.startSendTrace();
            Connection routeOf = this.routerManager.routeOf(produceMessage.getBase());
            MessageSenderGroup messageSenderGroup = (MessageSenderGroup) newHashMap.get(routeOf);
            if (messageSenderGroup == null) {
                messageSenderGroup = new MessageSenderGroup(this, routeOf);
                newHashMap.put(routeOf, messageSenderGroup);
            }
            messageSenderGroup.addMessage(produceMessage);
        }
        return newHashMap.values();
    }

    @Override // qunar.tc.qmq.producer.SendErrorHandler
    public void error(ProduceMessage produceMessage, Exception exc) {
        if (!(exc instanceof SubjectNotAssignedException)) {
            LOGGER.warn("Message 发送失败! {}", produceMessage.getMessageId(), exc);
        }
        TraceUtil.recordEvent("error");
        produceMessage.error(exc);
    }

    @Override // qunar.tc.qmq.producer.SendErrorHandler
    public void failed(ProduceMessage produceMessage, Exception exc) {
        LOGGER.warn("Message 发送失败! {}", produceMessage.getMessageId(), exc);
        TraceUtil.recordEvent("failed ");
        produceMessage.failed();
    }

    @Override // qunar.tc.qmq.producer.SendErrorHandler
    public void block(ProduceMessage produceMessage, MessageException messageException) {
        LOGGER.warn("Message 发送失败! {},被server拒绝,请检查应用授权配置,如果需要恢复消息请手工到db恢复状态", produceMessage.getMessageId(), messageException);
        TraceUtil.recordEvent("block");
        produceMessage.block();
    }

    @Override // qunar.tc.qmq.producer.SendErrorHandler
    public void finish(ProduceMessage produceMessage, Exception exc) {
        LOGGER.info("发送成功 {}:{}", produceMessage.getSubject(), produceMessage.getMessageId());
        produceMessage.finish();
    }

    @Override // qunar.tc.qmq.producer.QueueSender
    public void destroy() {
        this.executor.destroy();
    }
}
