package qunar.tc.qmq.producer;

import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.MessageSendStateListener;
import qunar.tc.qmq.MessageStore;
import qunar.tc.qmq.ProduceMessage;
import qunar.tc.qmq.base.BaseMessage;
import qunar.tc.qmq.metrics.Metrics;
import qunar.tc.qmq.metrics.QmqCounter;
import qunar.tc.qmq.metrics.QmqMeter;
import qunar.tc.qmq.tracing.TraceUtil;

/* loaded from: input_file:qunar/tc/qmq/producer/ProduceMessageImpl.class */
class ProduceMessageImpl implements ProduceMessage {
    private static final int INIT = 0;
    private static final int QUEUED = 1;
    private static final int FINISH = 100;
    private static final int ERROR = -1;
    private static final int BLOCK = -2;
    private int sendTryCount;
    private final BaseMessage base;
    private final QueueSender sender;
    private Span traceSpan;
    private Scope traceScope;
    private MessageSendStateListener sendStateListener;
    private boolean syncSend;
    private MessageStore store;
    private long sequence;
    private transient Object routeKey;
    private static final Logger LOGGER = LoggerFactory.getLogger(ProduceMessageImpl.class);
    private static final QmqCounter sendCount = Metrics.counter("qmq_client_send_count");
    private static final QmqCounter sendOkCount = Metrics.counter("qmq_client_send_ok_count");
    private static final QmqMeter sendOkQps = Metrics.meter("qmq_client_send_ok_qps");
    private static final QmqCounter sendErrorCount = Metrics.counter("qmq_client_send_error_count");
    private static final QmqCounter sendFailCount = Metrics.counter("qmq_client_send_fail_count");
    private static final QmqCounter resendCount = Metrics.counter("qmq_client_resend_count");
    private static final QmqCounter enterQueueFail = Metrics.counter("qmq_client_enter_queue_fail");
    private final AtomicInteger state = new AtomicInteger(0);
    private final AtomicInteger tries = new AtomicInteger(0);
    private final Tracer tracer = GlobalTracer.get();

    public ProduceMessageImpl(BaseMessage baseMessage, QueueSender queueSender) {
        this.base = baseMessage;
        this.sender = queueSender;
    }

    @Override // qunar.tc.qmq.ProduceMessage
    public void send() {
        sendCount.inc();
        attachTraceData();
        doSend();
    }

    private void doSend() {
        if (!this.state.compareAndSet(0, 1)) {
            enterQueueFail.inc();
            throw new IllegalStateException("同一条消息不能被入队两次.");
        }
        this.tries.incrementAndGet();
        if (sendSync()) {
            return;
        }
        Scope startActive = this.tracer.buildSpan("Qmq.QueueSender.Send").startActive(false);
        Throwable th = null;
        try {
            this.traceSpan = startActive.span();
            if (this.sender.offer(this)) {
                LOGGER.info("进入发送队列 {}:{}", getSubject(), getMessageId());
            } else if (this.store != null) {
                enterQueueFail.inc();
                LOGGER.info("内存发送队列已满! 此消息将暂时丢弃,等待补偿服务处理 {}:{}", getSubject(), getMessageId());
                failed();
            } else {
                LOGGER.info("内存发送队列已满! 此消息在用户进程阻塞,等待队列激活 {}:{}", getSubject(), getMessageId());
                if (this.sender.offer(this, 50L)) {
                    LOGGER.info("进入发送队列 {}:{}", getSubject(), getMessageId());
                } else {
                    enterQueueFail.inc();
                    LOGGER.info("由于无法入队,发送失败！取消发送 {}:{}", getSubject(), getMessageId());
                    onFailed();
                }
            }
            if (startActive != null) {
                if (0 == 0) {
                    startActive.close();
                    return;
                }
                try {
                    startActive.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (startActive != null) {
                if (0 != 0) {
                    try {
                        startActive.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    startActive.close();
                }
            }
            throw th3;
        }
    }

    private boolean sendSync() {
        if (this.store != null || !this.syncSend) {
            return false;
        }
        this.sender.send(this);
        return true;
    }

    @Override // qunar.tc.qmq.ProduceMessage
    public void finish() {
        this.state.set(100);
        try {
        } catch (Exception e) {
            TraceUtil.recordEvent("Qmq.Store.Failed");
        } finally {
            onSuccess();
            closeTrace();
        }
        if (this.store == null) {
            return;
        }
        if (this.base.isStoreAtFailed()) {
            return;
        }
        this.store.finish(this);
    }

    private void onSuccess() {
        sendOkCount.inc();
        sendOkQps.mark();
        if (this.sendStateListener == null) {
            return;
        }
        this.sendStateListener.onSuccess(this.base);
    }

    @Override // qunar.tc.qmq.ProduceMessage
    public void error(Exception exc) {
        this.state.set(-1);
        try {
            if (this.tries.get() < this.sendTryCount) {
                sendErrorCount.inc();
                LOGGER.info("发送失败, 重新发送. tryCount: {} {}:{}", new Object[]{Integer.valueOf(this.tries.get()), getSubject(), getMessageId()});
                resend();
            } else {
                failed();
            }
        } finally {
            closeTrace();
        }
    }

    @Override // qunar.tc.qmq.ProduceMessage
    public void block() {
        try {
            this.state.set(BLOCK);
            try {
            } catch (Exception e) {
                TraceUtil.recordEvent("Qmq.Store.Failed");
            }
            if (this.store == null) {
                return;
            }
            this.store.block(this);
            LOGGER.info("消息被拒绝");
            if (this.store == null && this.syncSend) {
                throw new RuntimeException("消息被拒绝且没有store可恢复,请检查应用授权配置");
            }
        } finally {
            onFailed();
            closeTrace();
        }
    }

    @Override // qunar.tc.qmq.ProduceMessage
    public void failed() {
        this.state.set(-1);
        try {
            sendErrorCount.inc();
            String str = "发送失败, 已尝试" + this.tries.get() + "次不再尝试重新发送.";
            LOGGER.info(str);
            try {
            } catch (Exception e) {
                TraceUtil.recordEvent("Qmq.Store.Failed");
            }
            if (this.store == null) {
                return;
            }
            if (this.base.isStoreAtFailed()) {
                this.store.insertNew(this);
            }
            if (this.store == null && this.syncSend) {
                throw new RuntimeException(str);
            }
        } finally {
            onFailed();
            closeTrace();
        }
    }

    private void onFailed() {
        TraceUtil.recordEvent("send_failed", this.tracer);
        sendFailCount.inc();
        if (this.sendStateListener == null) {
            return;
        }
        this.sendStateListener.onFailed(this.base);
    }

    private void resend() {
        resendCount.inc();
        this.traceSpan = null;
        this.traceScope = null;
        this.state.set(0);
        TraceUtil.recordEvent("retry", this.tracer);
        doSend();
    }

    @Override // qunar.tc.qmq.ProduceMessage
    public String getMessageId() {
        return this.base.getMessageId();
    }

    @Override // qunar.tc.qmq.ProduceMessage
    public String getSubject() {
        return this.base.getSubject();
    }

    @Override // qunar.tc.qmq.ProduceMessage
    public void startSendTrace() {
        if (this.traceSpan == null) {
            return;
        }
        this.traceScope = this.tracer.scopeManager().activate(this.traceSpan, false);
        attachTraceData();
    }

    @Override // qunar.tc.qmq.ProduceMessage
    public void setStore(MessageStore messageStore) {
        this.store = messageStore;
    }

    @Override // qunar.tc.qmq.ProduceMessage
    public void save() {
        this.sequence = this.store.insertNew(this);
    }

    @Override // qunar.tc.qmq.ProduceMessage
    public long getSequence() {
        return this.sequence;
    }

    @Override // qunar.tc.qmq.ProduceMessage
    public void setRouteKey(Object obj) {
        this.routeKey = obj;
    }

    @Override // qunar.tc.qmq.ProduceMessage
    public Object getRouteKey() {
        return this.routeKey;
    }

    private void attachTraceData() {
        TraceUtil.inject(this.base, this.tracer);
    }

    private void closeTrace() {
        if (this.traceScope == null) {
            return;
        }
        this.traceScope.close();
    }

    @Override // qunar.tc.qmq.ProduceMessage
    public BaseMessage getBase() {
        return this.base;
    }

    public void setSendTryCount(int i) {
        this.sendTryCount = i;
    }

    public void setSendStateListener(MessageSendStateListener messageSendStateListener) {
        this.sendStateListener = messageSendStateListener;
    }

    public void setSyncSend(boolean z) {
        this.syncSend = z;
    }
}
