package com.aliyun.mns.extended.fetcher;

import com.alibaba.fastjson.JSON;
import com.aliyun.mns.extended.javamessaging.Constants;
import com.aliyun.mns.extended.javamessaging.MNSMessageConsumer;
import com.aliyun.mns.extended.javamessaging.MNSQueueWrapper;
import com.aliyun.mns.extended.javamessaging.acknowledge.Acknowledger;
import com.aliyun.mns.extended.javamessaging.message.MNSBytesMessage;
import com.aliyun.mns.extended.javamessaging.message.MNSJsonableMessage;
import com.aliyun.mns.extended.javamessaging.message.MNSJsonableProperty;
import com.aliyun.mns.extended.javamessaging.message.MNSMessage;
import com.aliyun.mns.extended.javamessaging.message.MNSObjectMessage;
import com.aliyun.mns.extended.javamessaging.message.MNSTextMessage;
import com.aliyun.mns.extended.util.ExponentialBackoffStrategy;
import java.util.HashMap;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/aliyun/mns/extended/fetcher/MessageFetcher.class */
public class MessageFetcher implements Runnable, MessageFetcherManager {
    private static final Log LOG = LogFactory.getLog(MessageFetcher.class);
    private static final int POLLING_WAIT_SECONDS = 30;
    private MNSMessageConsumer consumer;
    private MNSQueueWrapper queue;
    private Acknowledger acknowledger;
    private int ackMode;
    private MessageListener listener = null;
    protected int messagesPrefetched = 0;
    protected int retriesAttempted = 0;
    protected ExponentialBackoffStrategy backoffStrategy = new ExponentialBackoffStrategy(25, 25, 2000);
    protected volatile boolean closed = false;
    protected volatile boolean running = false;
    private final Object stateLock = new Object();

    /* loaded from: input_file:com/aliyun/mns/extended/fetcher/MessageFetcher$MessageManager.class */
    public static class MessageManager {
        private final MessageFetcherManager prefetchManager;
        private final Message message;

        public MessageManager(MessageFetcherManager messageFetcherManager, Message message) {
            this.prefetchManager = messageFetcherManager;
            this.message = message;
        }

        public MessageFetcherManager getPrefetchManager() {
            return this.prefetchManager;
        }

        public Message getMessage() {
            return this.message;
        }
    }

    public MessageFetcher(MNSQueueWrapper mNSQueueWrapper, Acknowledger acknowledger, int i) {
        this.queue = mNSQueueWrapper;
        this.acknowledger = acknowledger;
        this.ackMode = i;
    }

    public void setMessageListener(MessageListener messageListener) {
        this.listener = messageListener;
        if (messageListener == null || isClosed()) {
            return;
        }
        synchronized (this.stateLock) {
            if (!this.running || isClosed()) {
                return;
            }
            notifyStateChange();
        }
    }

    public MessageListener getMessageListener() {
        return this.listener;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            com.aliyun.mns.model.Message message = null;
            try {
            } catch (InterruptedException e) {
                return;
            } catch (Throwable th) {
                LOG.error("Unexpected exception when fetch messages:", th);
            }
            if (isClosed()) {
                return;
            }
            synchronized (this.stateLock) {
                waitForStart();
                waitForListener();
            }
            if (!isClosed()) {
                message = getMessage();
            }
            if (message != null) {
                processReceivedMessage(message);
            }
        }
    }

    protected com.aliyun.mns.model.Message getMessage() throws InterruptedException {
        com.aliyun.mns.model.Message message = null;
        try {
            message = this.queue.popMessage(POLLING_WAIT_SECONDS);
            if (message == null) {
                LOG.debug("messages null");
            }
            this.retriesAttempted = 0;
        } catch (Exception e) {
            LOG.warn("Encountered exception during receive in ConsumerPrefetch thread", e);
            try {
                ExponentialBackoffStrategy exponentialBackoffStrategy = this.backoffStrategy;
                int i = this.retriesAttempted;
                this.retriesAttempted = i + 1;
                sleep(exponentialBackoffStrategy.delayBeforeNextRetry(i));
            } catch (InterruptedException e2) {
                LOG.warn("Interrupted while retrying on receive", e2);
                throw e2;
            }
        }
        return message;
    }

    protected Message convertToJMSMessage(com.aliyun.mns.model.Message message) throws JMSException {
        TextMessage mNSBytesMessage;
        String messageBodyAsString = message.getMessageBodyAsString();
        LOG.info("xxxdata: " + messageBodyAsString);
        MNSJsonableMessage mNSJsonableMessage = (MNSJsonableMessage) JSON.parseObject(messageBodyAsString, MNSJsonableMessage.class);
        List<MNSJsonableProperty> properties = mNSJsonableMessage.getProperties();
        HashMap hashMap = new HashMap();
        String str = MNSMessage.TEXT_MESSAGE_TYPE;
        for (MNSJsonableProperty mNSJsonableProperty : properties) {
            if (MNSMessage.JMS_MNS_MESSAGE_TYPE.equals(mNSJsonableProperty.getPropertyName())) {
                str = mNSJsonableProperty.getPropertyValue();
            } else {
                hashMap.put(mNSJsonableProperty.getPropertyName(), new MNSMessage.JMSMessagePropertyValue(mNSJsonableProperty.getPropertyValue(), mNSJsonableProperty.getPropertyType()));
            }
        }
        if (MNSMessage.BYTE_MESSAGE_TYPE.equals(str)) {
            try {
                mNSBytesMessage = new MNSBytesMessage(mNSJsonableMessage.getMessageBody(), hashMap, this.acknowledger, this.queue.getQueueURL(), message.getReceiptHandle());
            } catch (JMSException e) {
                LOG.warn("MessageReceiptHandle - " + message.getReceiptHandle() + "cannot be serialized to BytesMessage", e);
                throw e;
            }
        } else if (MNSMessage.OBJECT_MESSAGE_TYPE.equals(str)) {
            mNSBytesMessage = new MNSObjectMessage(mNSJsonableMessage.getMessageBody(), hashMap, this.acknowledger, this.queue.getQueueURL(), message.getReceiptHandle());
        } else {
            if (!MNSMessage.TEXT_MESSAGE_TYPE.equals(str)) {
                throw new JMSException("Not a supported JMS message type");
            }
            mNSBytesMessage = new MNSTextMessage(mNSJsonableMessage.getMessageBody(), hashMap, this.acknowledger, this.queue.getQueueURL(), message.getReceiptHandle());
        }
        return mNSBytesMessage;
    }

    protected void processReceivedMessage(com.aliyun.mns.model.Message message) throws JMSException {
        if (message == null) {
            return;
        }
        MessageListener messageListener = this.listener;
        if (messageListener == null) {
            try {
                this.queue.changeMessageVisibilityTimeout(message.getReceiptHandle(), 1);
                return;
            } catch (Exception e) {
                LOG.warn("changeMessageVisibilityTimeout fail: " + message.getReceiptHandle() + " " + e.toString());
                return;
            }
        }
        try {
            Message convertToJMSMessage = convertToJMSMessage(message);
            boolean z = false;
            try {
                try {
                    messageListener.onMessage(convertToJMSMessage);
                    if (0 == 0) {
                        MNSMessage mNSMessage = (MNSMessage) convertToJMSMessage;
                        if (this.ackMode == 1) {
                            mNSMessage.acknowledge();
                        }
                    }
                } catch (Throwable th) {
                    if (!z) {
                        MNSMessage mNSMessage2 = (MNSMessage) convertToJMSMessage;
                        if (this.ackMode == 1) {
                            mNSMessage2.acknowledge();
                        }
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                LOG.warn("Exception thrown from onMessage callback for message", th2);
                z = true;
                if (1 == 0) {
                    MNSMessage mNSMessage3 = (MNSMessage) convertToJMSMessage;
                    if (this.ackMode == 1) {
                        mNSMessage3.acknowledge();
                    }
                }
            }
            synchronized (this.stateLock) {
                notifyStateChange();
            }
        } catch (JMSException e2) {
            LOG.warn("processReceivedMessages Exception: " + message.getMessageId() + " " + e2.toString());
        }
    }

    protected void waitForStart() throws InterruptedException {
        synchronized (this.stateLock) {
            while (!this.running && !isClosed()) {
                try {
                    LOG.info("wait for start");
                    this.stateLock.wait();
                    LOG.info("wakeup try to check listener");
                } catch (InterruptedException e) {
                    LOG.warn("Interrupted while waiting on consumer start", e);
                    throw e;
                }
            }
        }
    }

    protected void waitForListener() throws InterruptedException {
        synchronized (this.stateLock) {
            while (this.listener == null && !isClosed()) {
                try {
                    LOG.info("wait for listener");
                    this.stateLock.wait();
                    LOG.info("wakeup try to fetch");
                } catch (InterruptedException e) {
                    LOG.warn("Interrupted while waiting on listener", e);
                    throw e;
                }
            }
        }
    }

    protected void notifyStateChange() {
        synchronized (this.stateLock) {
            this.stateLock.notifyAll();
        }
    }

    public void start() {
        if (isClosed() || this.running) {
            return;
        }
        synchronized (this.stateLock) {
            this.running = true;
            notifyStateChange();
        }
    }

    public void stop() {
        if (isClosed() || !this.running) {
            return;
        }
        synchronized (this.stateLock) {
            this.running = false;
            notifyStateChange();
        }
    }

    public void close() {
        if (isClosed()) {
            return;
        }
        synchronized (this.stateLock) {
            this.closed = true;
            notifyStateChange();
            this.listener = null;
        }
    }

    protected boolean isClosed() {
        return this.closed;
    }

    @Override // com.aliyun.mns.extended.fetcher.MessageFetcherManager
    public void messageDispatched() {
        synchronized (this.stateLock) {
            notifyStateChange();
        }
    }

    @Override // com.aliyun.mns.extended.fetcher.MessageFetcherManager
    public MNSMessageConsumer getConsumer() {
        return this.consumer;
    }

    public void setMessageConsumer(MNSMessageConsumer mNSMessageConsumer) {
        this.consumer = mNSMessageConsumer;
    }

    protected void sleep(long j) throws InterruptedException {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            throw e;
        }
    }

    public MNSQueueWrapper getQueue() {
        return this.queue;
    }

    public Message receive() throws Exception {
        throw new JMSException(Constants.UNSUPPORTED_METHOD);
    }

    public Message receive(long j) throws Exception {
        throw new JMSException(Constants.UNSUPPORTED_METHOD);
    }
}
